opentelemetry_sdk/metrics/
mod.rs

1//! The crust of the OpenTelemetry metrics SDK.
2//!
3//! ## Configuration
4//!
5//! The metrics SDK configuration is stored with each [SdkMeterProvider].
6//! Configuration for [Resource]s, views, and [ManualReader] or
7//! [PeriodicReader] instances can be specified.
8//!
9//! ### Example
10//!
11//! ```
12//! use opentelemetry::global;
13//! use opentelemetry::KeyValue;
14//! use opentelemetry_sdk::{metrics::SdkMeterProvider, Resource};
15//!
16//! // Generate SDK configuration, resource, views, etc
17//! let resource = Resource::builder().build(); // default attributes about the current process
18//!
19//! // Create a meter provider with the desired config
20//! let meter_provider = SdkMeterProvider::builder().with_resource(resource).build();
21//! global::set_meter_provider(meter_provider.clone());
22//!
23//! // Use the meter provider to create meter instances
24//! let meter = global::meter("my_app");
25//!
26//! // Create instruments scoped to the meter
27//! let counter = meter
28//!     .u64_counter("power_consumption")
29//!     .with_unit("kWh")
30//!     .build();
31//!
32//! // use instruments to record measurements
33//! counter.add(10, &[KeyValue::new("rate", "standard")]);
34//!
35//! // shutdown the provider at the end of the application to ensure any metrics not yet
36//! // exported are flushed.
37//! meter_provider.shutdown().unwrap();
38//! ```
39//!
40//! [Resource]: crate::Resource
41
42#[allow(unreachable_pub)]
43#[allow(unused)]
44pub(crate) mod aggregation;
45pub mod data;
46mod error;
47pub mod exporter;
48pub(crate) mod instrument;
49pub(crate) mod internal;
50#[cfg(feature = "experimental_metrics_custom_reader")]
51pub(crate) mod manual_reader;
52pub(crate) mod meter;
53mod meter_provider;
54pub(crate) mod noop;
55pub(crate) mod periodic_reader;
56#[cfg(feature = "experimental_metrics_periodicreader_with_async_runtime")]
57/// Module for periodic reader with async runtime.
58pub mod periodic_reader_with_async_runtime;
59pub(crate) mod pipeline;
60#[cfg(feature = "experimental_metrics_custom_reader")]
61pub mod reader;
62#[cfg(not(feature = "experimental_metrics_custom_reader"))]
63pub(crate) mod reader;
64pub(crate) mod view;
65
66/// In-Memory metric exporter for testing purpose.
67#[cfg(any(feature = "testing", test))]
68#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
69pub mod in_memory_exporter;
70#[cfg(any(feature = "testing", test))]
71#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
72pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};
73
74#[cfg(feature = "spec_unstable_metrics_views")]
75pub use aggregation::*;
76#[cfg(feature = "experimental_metrics_custom_reader")]
77pub use manual_reader::*;
78pub use meter_provider::*;
79pub use periodic_reader::*;
80#[cfg(feature = "experimental_metrics_custom_reader")]
81pub use pipeline::Pipeline;
82
83pub use instrument::{Instrument, InstrumentKind, Stream, StreamBuilder};
84
85use std::hash::Hash;
86
87/// Defines the window that an aggregation was calculated over.
88#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Hash)]
89#[non_exhaustive]
90pub enum Temporality {
91    /// A measurement interval that continues to expand forward in time from a
92    /// starting point.
93    ///
94    /// New measurements are added to all previous measurements since a start time.
95    #[default]
96    Cumulative,
97
98    /// A measurement interval that resets each cycle.
99    ///
100    /// Measurements from one cycle are recorded independently, measurements from
101    /// other cycles do not affect them.
102    Delta,
103
104    /// Configures Synchronous Counter and Histogram instruments to use
105    /// Delta aggregation temporality, which allows them to shed memory
106    /// following a cardinality explosion, thus use less memory.
107    LowMemory,
108}
109
110#[cfg(all(test, feature = "testing"))]
111mod tests {
112    use self::data::{HistogramDataPoint, ScopeMetrics, SumDataPoint};
113    use super::data::MetricData;
114    use super::internal::Number;
115    use super::*;
116    use crate::metrics::data::ResourceMetrics;
117    use crate::metrics::internal::AggregatedMetricsAccess;
118    use crate::metrics::InMemoryMetricExporter;
119    use crate::metrics::InMemoryMetricExporterBuilder;
120    use data::GaugeDataPoint;
121    use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
122    use opentelemetry::InstrumentationScope;
123    use opentelemetry::Value;
124    use opentelemetry::{metrics::MeterProvider as _, KeyValue};
125    use rand::{rngs, Rng, SeedableRng};
126    use std::cmp::{max, min};
127    use std::sync::atomic::{AtomicBool, Ordering};
128    use std::sync::{Arc, Mutex};
129    use std::thread;
130    use std::time::Duration;
131
132    // Run all tests in this mod
133    // cargo test metrics::tests --features=testing,spec_unstable_metrics_views
134    // Note for all tests from this point onwards in this mod:
135    // "multi_thread" tokio flavor must be used else flush won't
136    // be able to make progress!
137
138    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
139    #[cfg(not(feature = "experimental_metrics_disable_name_validation"))]
140    async fn invalid_instrument_config_noops() {
141        // Run this test with stdout enabled to see output.
142        // cargo test invalid_instrument_config_noops --features=testing,spec_unstable_metrics_views -- --nocapture
143        let invalid_instrument_names = vec![
144            "_startWithNoneAlphabet",
145            "utf8char锈",
146            "a".repeat(256).leak(),
147            "invalid name",
148        ];
149        for name in invalid_instrument_names {
150            let test_context = TestContext::new(Temporality::Cumulative);
151            let counter = test_context.meter().u64_counter(name).build();
152            counter.add(1, &[]);
153
154            let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
155            up_down_counter.add(1, &[]);
156
157            let gauge = test_context.meter().f64_gauge(name).build();
158            gauge.record(1.9, &[]);
159
160            let histogram = test_context.meter().f64_histogram(name).build();
161            histogram.record(1.0, &[]);
162
163            let _observable_counter = test_context
164                .meter()
165                .u64_observable_counter(name)
166                .with_callback(move |observer| {
167                    observer.observe(1, &[]);
168                })
169                .build();
170
171            let _observable_gauge = test_context
172                .meter()
173                .f64_observable_gauge(name)
174                .with_callback(move |observer| {
175                    observer.observe(1.0, &[]);
176                })
177                .build();
178
179            let _observable_up_down_counter = test_context
180                .meter()
181                .i64_observable_up_down_counter(name)
182                .with_callback(move |observer| {
183                    observer.observe(1, &[]);
184                })
185                .build();
186
187            test_context.flush_metrics();
188
189            // As instrument name is invalid, no metrics should be exported
190            test_context.check_no_metrics();
191        }
192
193        let invalid_bucket_boundaries = vec![
194            vec![1.0, 1.0],                          // duplicate boundaries
195            vec![1.0, 2.0, 3.0, 2.0],                // duplicate non consequent boundaries
196            vec![1.0, 2.0, 3.0, 4.0, 2.5],           // unsorted boundaries
197            vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], // boundaries with positive infinity
198            vec![1.0, 2.0, 3.0, f64::NAN],           // boundaries with NaNs
199            vec![f64::NEG_INFINITY, 2.0, 3.0],       // boundaries with negative infinity
200        ];
201        for bucket_boundaries in invalid_bucket_boundaries {
202            let test_context = TestContext::new(Temporality::Cumulative);
203            let histogram = test_context
204                .meter()
205                .f64_histogram("test")
206                .with_boundaries(bucket_boundaries)
207                .build();
208            histogram.record(1.9, &[]);
209            test_context.flush_metrics();
210
211            // As bucket boundaries provided via advisory params are invalid,
212            // no metrics should be exported
213            test_context.check_no_metrics();
214        }
215    }
216
217    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
218    #[cfg(feature = "experimental_metrics_disable_name_validation")]
219    async fn valid_instrument_config_with_feature_experimental_metrics_disable_name_validation() {
220        // Run this test with stdout enabled to see output.
221        // cargo test valid_instrument_config_with_feature_experimental_metrics_disable_name_validation --all-features -- --nocapture
222        let invalid_instrument_names = vec![
223            "_startWithNoneAlphabet",
224            "utf8char锈",
225            "",
226            "a".repeat(256).leak(),
227            "\\allow\\slash /sec",
228            "\\allow\\$$slash /sec",
229            "Total $ Count",
230            "\\test\\UsagePercent(Total) > 80%",
231            "invalid name",
232        ];
233        for name in invalid_instrument_names {
234            let test_context = TestContext::new(Temporality::Cumulative);
235            let counter = test_context.meter().u64_counter(name).build();
236            counter.add(1, &[]);
237
238            let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
239            up_down_counter.add(1, &[]);
240
241            let gauge = test_context.meter().f64_gauge(name).build();
242            gauge.record(1.9, &[]);
243
244            let histogram = test_context.meter().f64_histogram(name).build();
245            histogram.record(1.0, &[]);
246
247            let _observable_counter = test_context
248                .meter()
249                .u64_observable_counter(name)
250                .with_callback(move |observer| {
251                    observer.observe(1, &[]);
252                })
253                .build();
254
255            let _observable_gauge = test_context
256                .meter()
257                .f64_observable_gauge(name)
258                .with_callback(move |observer| {
259                    observer.observe(1.0, &[]);
260                })
261                .build();
262
263            let _observable_up_down_counter = test_context
264                .meter()
265                .i64_observable_up_down_counter(name)
266                .with_callback(move |observer| {
267                    observer.observe(1, &[]);
268                })
269                .build();
270
271            test_context.flush_metrics();
272
273            // As instrument name are valid because of the feature flag, metrics should be exported
274            let resource_metrics = test_context
275                .exporter
276                .get_finished_metrics()
277                .expect("metrics expected to be exported");
278
279            assert!(!resource_metrics.is_empty(), "metrics should be exported");
280        }
281
282        // Ensuring that the Histograms with invalid bucket boundaries are not exported
283        // when using the feature flag
284        let invalid_bucket_boundaries = vec![
285            vec![1.0, 1.0],                          // duplicate boundaries
286            vec![1.0, 2.0, 3.0, 2.0],                // duplicate non consequent boundaries
287            vec![1.0, 2.0, 3.0, 4.0, 2.5],           // unsorted boundaries
288            vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], // boundaries with positive infinity
289            vec![1.0, 2.0, 3.0, f64::NAN],           // boundaries with NaNs
290            vec![f64::NEG_INFINITY, 2.0, 3.0],       // boundaries with negative infinity
291        ];
292        for bucket_boundaries in invalid_bucket_boundaries {
293            let test_context = TestContext::new(Temporality::Cumulative);
294            let histogram = test_context
295                .meter()
296                .f64_histogram("test")
297                .with_boundaries(bucket_boundaries)
298                .build();
299            histogram.record(1.9, &[]);
300            test_context.flush_metrics();
301
302            // As bucket boundaries provided via advisory params are invalid,
303            // no metrics should be exported
304            test_context.check_no_metrics();
305        }
306    }
307
308    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
309    async fn counter_aggregation_delta() {
310        // Run this test with stdout enabled to see output.
311        // cargo test counter_aggregation_delta --features=testing -- --nocapture
312        counter_aggregation_helper(Temporality::Delta);
313    }
314
315    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
316    async fn counter_aggregation_cumulative() {
317        // Run this test with stdout enabled to see output.
318        // cargo test counter_aggregation_cumulative --features=testing -- --nocapture
319        counter_aggregation_helper(Temporality::Cumulative);
320    }
321
322    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
323    async fn counter_aggregation_no_attributes_cumulative() {
324        let mut test_context = TestContext::new(Temporality::Cumulative);
325        let counter = test_context.u64_counter("test", "my_counter", None);
326
327        counter.add(50, &[]);
328        test_context.flush_metrics();
329
330        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
331            unreachable!()
332        };
333
334        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
335        assert!(sum.is_monotonic, "Should produce monotonic.");
336        assert_eq!(
337            sum.temporality,
338            Temporality::Cumulative,
339            "Should produce cumulative"
340        );
341
342        let data_point = &sum.data_points[0];
343        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
344        assert_eq!(data_point.value, 50, "Unexpected data point value");
345    }
346
347    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
348    async fn counter_aggregation_no_attributes_delta() {
349        let mut test_context = TestContext::new(Temporality::Delta);
350        let counter = test_context.u64_counter("test", "my_counter", None);
351
352        counter.add(50, &[]);
353        test_context.flush_metrics();
354
355        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
356            unreachable!()
357        };
358
359        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
360        assert!(sum.is_monotonic, "Should produce monotonic.");
361        assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
362
363        let data_point = &sum.data_points[0];
364        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
365        assert_eq!(data_point.value, 50, "Unexpected data point value");
366    }
367
368    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
369    async fn counter_aggregation_overflow_delta() {
370        counter_aggregation_overflow_helper(Temporality::Delta);
371        counter_aggregation_overflow_helper_custom_limit(Temporality::Delta);
372    }
373
374    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
375    async fn counter_aggregation_overflow_cumulative() {
376        counter_aggregation_overflow_helper(Temporality::Cumulative);
377        counter_aggregation_overflow_helper_custom_limit(Temporality::Cumulative);
378    }
379
380    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
381    async fn counter_aggregation_attribute_order_sorted_first_delta() {
382        // Run this test with stdout enabled to see output.
383        // cargo test counter_aggregation_attribute_order_sorted_first_delta --features=testing -- --nocapture
384        counter_aggregation_attribute_order_helper(Temporality::Delta, true);
385    }
386
387    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
388    async fn counter_aggregation_attribute_order_sorted_first_cumulative() {
389        // Run this test with stdout enabled to see output.
390        // cargo test counter_aggregation_attribute_order_sorted_first_cumulative --features=testing -- --nocapture
391        counter_aggregation_attribute_order_helper(Temporality::Cumulative, true);
392    }
393
394    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
395    async fn counter_aggregation_attribute_order_unsorted_first_delta() {
396        // Run this test with stdout enabled to see output.
397        // cargo test counter_aggregation_attribute_order_unsorted_first_delta --features=testing -- --nocapture
398
399        counter_aggregation_attribute_order_helper(Temporality::Delta, false);
400    }
401
402    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
403    async fn counter_aggregation_attribute_order_unsorted_first_cumulative() {
404        // Run this test with stdout enabled to see output.
405        // cargo test counter_aggregation_attribute_order_unsorted_first_cumulative --features=testing -- --nocapture
406
407        counter_aggregation_attribute_order_helper(Temporality::Cumulative, false);
408    }
409
410    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
411    async fn histogram_aggregation_cumulative() {
412        // Run this test with stdout enabled to see output.
413        // cargo test histogram_aggregation_cumulative --features=testing -- --nocapture
414        histogram_aggregation_helper(Temporality::Cumulative);
415    }
416
417    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
418    async fn histogram_aggregation_delta() {
419        // Run this test with stdout enabled to see output.
420        // cargo test histogram_aggregation_delta --features=testing -- --nocapture
421        histogram_aggregation_helper(Temporality::Delta);
422    }
423
424    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
425    async fn histogram_aggregation_with_custom_bounds() {
426        // Run this test with stdout enabled to see output.
427        // cargo test histogram_aggregation_with_custom_bounds --features=testing -- --nocapture
428        histogram_aggregation_with_custom_bounds_helper(Temporality::Delta);
429        histogram_aggregation_with_custom_bounds_helper(Temporality::Cumulative);
430    }
431
432    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
433    async fn histogram_aggregation_with_empty_bounds() {
434        // Run this test with stdout enabled to see output.
435        // cargo test histogram_aggregation_with_empty_bounds --features=testing -- --nocapture
436        histogram_aggregation_with_empty_bounds_helper(Temporality::Delta);
437        histogram_aggregation_with_empty_bounds_helper(Temporality::Cumulative);
438    }
439
440    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
441    async fn updown_counter_aggregation_cumulative() {
442        // Run this test with stdout enabled to see output.
443        // cargo test updown_counter_aggregation_cumulative --features=testing -- --nocapture
444        updown_counter_aggregation_helper(Temporality::Cumulative);
445    }
446
447    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
448    async fn updown_counter_aggregation_delta() {
449        // Run this test with stdout enabled to see output.
450        // cargo test updown_counter_aggregation_delta --features=testing -- --nocapture
451        updown_counter_aggregation_helper(Temporality::Delta);
452    }
453
454    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
455    async fn gauge_aggregation() {
456        // Run this test with stdout enabled to see output.
457        // cargo test gauge_aggregation --features=testing -- --nocapture
458
459        // Gauge should use last value aggregation regardless of the aggregation temporality used.
460        gauge_aggregation_helper(Temporality::Delta);
461        gauge_aggregation_helper(Temporality::Cumulative);
462    }
463
464    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
465    async fn observable_gauge_aggregation() {
466        // Run this test with stdout enabled to see output.
467        // cargo test observable_gauge_aggregation --features=testing -- --nocapture
468
469        // Gauge should use last value aggregation regardless of the aggregation temporality used.
470        observable_gauge_aggregation_helper(Temporality::Delta, false);
471        observable_gauge_aggregation_helper(Temporality::Delta, true);
472        observable_gauge_aggregation_helper(Temporality::Cumulative, false);
473        observable_gauge_aggregation_helper(Temporality::Cumulative, true);
474    }
475
476    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
477    async fn observable_counter_aggregation_cumulative_non_zero_increment() {
478        // Run this test with stdout enabled to see output.
479        // cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture
480        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false);
481    }
482
483    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
484    async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() {
485        // Run this test with stdout enabled to see output.
486        // cargo test observable_counter_aggregation_cumulative_non_zero_increment_no_attrs --features=testing -- --nocapture
487        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true);
488    }
489
490    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
491    async fn observable_counter_aggregation_delta_non_zero_increment() {
492        // Run this test with stdout enabled to see output.
493        // cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture
494        observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false);
495    }
496
497    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
498    async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() {
499        // Run this test with stdout enabled to see output.
500        // cargo test observable_counter_aggregation_delta_non_zero_increment_no_attrs --features=testing -- --nocapture
501        observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true);
502    }
503
504    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
505    async fn observable_counter_aggregation_cumulative_zero_increment() {
506        // Run this test with stdout enabled to see output.
507        // cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture
508        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false);
509    }
510
511    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
512    async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() {
513        // Run this test with stdout enabled to see output.
514        // cargo test observable_counter_aggregation_cumulative_zero_increment_no_attrs --features=testing -- --nocapture
515        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true);
516    }
517
518    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
519    async fn observable_counter_aggregation_delta_zero_increment() {
520        // Run this test with stdout enabled to see output.
521        // cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture
522        observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false);
523    }
524
525    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
526    async fn observable_counter_aggregation_delta_zero_increment_no_attrs() {
527        // Run this test with stdout enabled to see output.
528        // cargo test observable_counter_aggregation_delta_zero_increment_no_attrs --features=testing -- --nocapture
529        observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true);
530    }
531
532    fn observable_counter_aggregation_helper(
533        temporality: Temporality,
534        start: u64,
535        increment: u64,
536        length: u64,
537        is_empty_attributes: bool,
538    ) {
539        // Arrange
540        let mut test_context = TestContext::new(temporality);
541        let attributes = if is_empty_attributes {
542            vec![]
543        } else {
544            vec![KeyValue::new("key1", "value1")]
545        };
546        // The Observable counter reports values[0], values[1],....values[n] on each flush.
547        let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
548        println!("Testing with observable values: {:?}", values);
549        let values = Arc::new(values);
550        let values_clone = values.clone();
551        let i = Arc::new(Mutex::new(0));
552        let _observable_counter = test_context
553            .meter()
554            .u64_observable_counter("my_observable_counter")
555            .with_unit("my_unit")
556            .with_callback(move |observer| {
557                let mut index = i.lock().unwrap();
558                if *index < values.len() {
559                    observer.observe(values[*index], &attributes);
560                    *index += 1;
561                }
562            })
563            .build();
564
565        for (iter, v) in values_clone.iter().enumerate() {
566            test_context.flush_metrics();
567            let MetricData::Sum(sum) =
568                test_context.get_aggregation::<u64>("my_observable_counter", None)
569            else {
570                unreachable!()
571            };
572            assert_eq!(sum.data_points.len(), 1);
573            assert!(sum.is_monotonic, "Counter should produce monotonic.");
574            if let Temporality::Cumulative = temporality {
575                assert_eq!(
576                    sum.temporality,
577                    Temporality::Cumulative,
578                    "Should produce cumulative"
579                );
580            } else {
581                assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
582            }
583
584            // find and validate datapoint
585            let data_point = if is_empty_attributes {
586                &sum.data_points[0]
587            } else {
588                find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
589                    .expect("datapoint with key1=value1 expected")
590            };
591
592            if let Temporality::Cumulative = temporality {
593                // Cumulative counter should have the value as is.
594                assert_eq!(data_point.value, *v);
595            } else {
596                // Delta counter should have the increment value.
597                // Except for the first value which should be the start value.
598                if iter == 0 {
599                    assert_eq!(data_point.value, start);
600                } else {
601                    assert_eq!(data_point.value, increment);
602                }
603            }
604
605            test_context.reset_metrics();
606        }
607    }
608
609    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
610    async fn empty_meter_name_retained() {
611        async fn meter_name_retained_helper(
612            meter: Meter,
613            provider: SdkMeterProvider,
614            exporter: InMemoryMetricExporter,
615        ) {
616            // Act
617            let counter = meter.u64_counter("my_counter").build();
618
619            counter.add(10, &[]);
620            provider.force_flush().unwrap();
621
622            // Assert
623            let resource_metrics = exporter
624                .get_finished_metrics()
625                .expect("metrics are expected to be exported.");
626            assert!(
627                resource_metrics[0].scope_metrics[0].metrics.len() == 1,
628                "There should be a single metric"
629            );
630            let meter_name = resource_metrics[0].scope_metrics[0].scope.name();
631            assert_eq!(meter_name, "");
632        }
633
634        let exporter = InMemoryMetricExporter::default();
635        let meter_provider = SdkMeterProvider::builder()
636            .with_periodic_exporter(exporter.clone())
637            .build();
638
639        // Test Meter creation in 2 ways, both with empty string as meter name
640        let meter1 = meter_provider.meter("");
641        meter_name_retained_helper(meter1, meter_provider.clone(), exporter.clone()).await;
642
643        let meter_scope = InstrumentationScope::builder("").build();
644        let meter2 = meter_provider.meter_with_scope(meter_scope);
645        meter_name_retained_helper(meter2, meter_provider, exporter).await;
646    }
647
648    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
649    async fn counter_duplicate_instrument_merge() {
650        // Arrange
651        let exporter = InMemoryMetricExporter::default();
652        let meter_provider = SdkMeterProvider::builder()
653            .with_periodic_exporter(exporter.clone())
654            .build();
655
656        // Act
657        let meter = meter_provider.meter("test");
658        let counter = meter
659            .u64_counter("my_counter")
660            .with_unit("my_unit")
661            .with_description("my_description")
662            .build();
663
664        let counter_duplicated = meter
665            .u64_counter("my_counter")
666            .with_unit("my_unit")
667            .with_description("my_description")
668            .build();
669
670        let attribute = vec![KeyValue::new("key1", "value1")];
671        counter.add(10, &attribute);
672        counter_duplicated.add(5, &attribute);
673
674        meter_provider.force_flush().unwrap();
675
676        // Assert
677        let resource_metrics = exporter
678            .get_finished_metrics()
679            .expect("metrics are expected to be exported.");
680        assert!(
681            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
682            "There should be single metric merging duplicate instruments"
683        );
684        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
685        assert_eq!(metric.name, "my_counter");
686        assert_eq!(metric.unit, "my_unit");
687        let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
688            .expect("Sum aggregation expected for Counter instruments by default")
689        else {
690            unreachable!()
691        };
692
693        // Expecting 1 time-series.
694        assert_eq!(sum.data_points.len(), 1);
695
696        let datapoint = &sum.data_points[0];
697        assert_eq!(datapoint.value, 15);
698    }
699
700    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
701    async fn counter_duplicate_instrument_different_meter_no_merge() {
702        // Arrange
703        let exporter = InMemoryMetricExporter::default();
704        let meter_provider = SdkMeterProvider::builder()
705            .with_periodic_exporter(exporter.clone())
706            .build();
707
708        // Act
709        let meter1 = meter_provider.meter("test.meter1");
710        let meter2 = meter_provider.meter("test.meter2");
711        let counter1 = meter1
712            .u64_counter("my_counter")
713            .with_unit("my_unit")
714            .with_description("my_description")
715            .build();
716
717        let counter2 = meter2
718            .u64_counter("my_counter")
719            .with_unit("my_unit")
720            .with_description("my_description")
721            .build();
722
723        let attribute = vec![KeyValue::new("key1", "value1")];
724        counter1.add(10, &attribute);
725        counter2.add(5, &attribute);
726
727        meter_provider.force_flush().unwrap();
728
729        // Assert
730        let resource_metrics = exporter
731            .get_finished_metrics()
732            .expect("metrics are expected to be exported.");
733        assert!(
734            resource_metrics[0].scope_metrics.len() == 2,
735            "There should be 2 separate scope"
736        );
737        assert!(
738            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
739            "There should be single metric for the scope"
740        );
741        assert!(
742            resource_metrics[0].scope_metrics[1].metrics.len() == 1,
743            "There should be single metric for the scope"
744        );
745
746        let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1");
747        let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2");
748
749        if let Some(scope1) = scope1 {
750            let metric1 = &scope1.metrics[0];
751            assert_eq!(metric1.name, "my_counter");
752            assert_eq!(metric1.unit, "my_unit");
753            assert_eq!(metric1.description, "my_description");
754            let MetricData::Sum(sum1) = u64::extract_metrics_data_ref(&metric1.data)
755                .expect("Sum aggregation expected for Counter instruments by default")
756            else {
757                unreachable!()
758            };
759
760            // Expecting 1 time-series.
761            assert_eq!(sum1.data_points.len(), 1);
762
763            let datapoint1 = &sum1.data_points[0];
764            assert_eq!(datapoint1.value, 10);
765        } else {
766            panic!("No MetricScope found for 'test.meter1'");
767        }
768
769        if let Some(scope2) = scope2 {
770            let metric2 = &scope2.metrics[0];
771            assert_eq!(metric2.name, "my_counter");
772            assert_eq!(metric2.unit, "my_unit");
773            assert_eq!(metric2.description, "my_description");
774
775            let MetricData::Sum(sum2) = u64::extract_metrics_data_ref(&metric2.data)
776                .expect("Sum aggregation expected for Counter instruments by default")
777            else {
778                unreachable!()
779            };
780
781            // Expecting 1 time-series.
782            assert_eq!(sum2.data_points.len(), 1);
783
784            let datapoint2 = &sum2.data_points[0];
785            assert_eq!(datapoint2.value, 5);
786        } else {
787            panic!("No MetricScope found for 'test.meter2'");
788        }
789    }
790
791    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
792    async fn instrumentation_scope_identity_test() {
793        // Arrange
794        let exporter = InMemoryMetricExporter::default();
795        let meter_provider = SdkMeterProvider::builder()
796            .with_periodic_exporter(exporter.clone())
797            .build();
798
799        // Act
800        // Meters are identical.
801        // Hence there should be a single metric stream output for this test.
802        let make_scope = |attributes| {
803            InstrumentationScope::builder("test.meter")
804                .with_version("v0.1.0")
805                .with_schema_url("http://example.com")
806                .with_attributes(attributes)
807                .build()
808        };
809
810        let meter1 =
811            meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
812        let meter2 =
813            meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
814
815        let counter1 = meter1
816            .u64_counter("my_counter")
817            .with_unit("my_unit")
818            .with_description("my_description")
819            .build();
820
821        let counter2 = meter2
822            .u64_counter("my_counter")
823            .with_unit("my_unit")
824            .with_description("my_description")
825            .build();
826
827        let attribute = vec![KeyValue::new("key1", "value1")];
828        counter1.add(10, &attribute);
829        counter2.add(5, &attribute);
830
831        meter_provider.force_flush().unwrap();
832
833        // Assert
834        let resource_metrics = exporter
835            .get_finished_metrics()
836            .expect("metrics are expected to be exported.");
837        println!("resource_metrics: {:?}", resource_metrics);
838        assert!(
839            resource_metrics[0].scope_metrics.len() == 1,
840            "There should be a single scope as the meters are identical"
841        );
842        assert!(
843            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
844            "There should be single metric for the scope as instruments are identical"
845        );
846
847        let scope = &resource_metrics[0].scope_metrics[0].scope;
848        assert_eq!(scope.name(), "test.meter");
849        assert_eq!(scope.version(), Some("v0.1.0"));
850        assert_eq!(scope.schema_url(), Some("http://example.com"));
851
852        // This is validating current behavior, but it is not guaranteed to be the case in the future,
853        // as this is a user error and SDK reserves right to change this behavior.
854        assert!(scope.attributes().eq(&[KeyValue::new("key", "value1")]));
855
856        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
857        assert_eq!(metric.name, "my_counter");
858        assert_eq!(metric.unit, "my_unit");
859        assert_eq!(metric.description, "my_description");
860
861        let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
862            .expect("Sum aggregation expected for Counter instruments by default")
863        else {
864            unreachable!()
865        };
866
867        // Expecting 1 time-series.
868        assert_eq!(sum.data_points.len(), 1);
869
870        let datapoint = &sum.data_points[0];
871        assert_eq!(datapoint.value, 15);
872    }
873
874    #[cfg(feature = "spec_unstable_metrics_views")]
875    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
876    async fn histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist() {
877        // Run this test with stdout enabled to see output.
878        // cargo test histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist --features=testing -- --nocapture
879
880        // Arrange
881        let exporter = InMemoryMetricExporter::default();
882        let view = |i: &Instrument| {
883            if i.name == "test_histogram" {
884                Stream::builder()
885                    .with_aggregation(aggregation::Aggregation::ExplicitBucketHistogram {
886                        boundaries: vec![0.9, 1.9, 1.2, 1.3, 1.4, 1.5], // invalid boundaries
887                        record_min_max: false,
888                    })
889                    .with_name("test_histogram_renamed")
890                    .with_unit("test_unit_renamed")
891                    .build()
892                    .ok()
893            } else {
894                None
895            }
896        };
897        let meter_provider = SdkMeterProvider::builder()
898            .with_periodic_exporter(exporter.clone())
899            .with_view(view)
900            .build();
901
902        // Act
903        let meter = meter_provider.meter("test");
904        let histogram = meter
905            .f64_histogram("test_histogram")
906            .with_unit("test_unit")
907            .build();
908
909        histogram.record(1.5, &[KeyValue::new("key1", "value1")]);
910        meter_provider.force_flush().unwrap();
911
912        // Assert
913        let resource_metrics = exporter
914            .get_finished_metrics()
915            .expect("metrics are expected to be exported.");
916        assert!(!resource_metrics.is_empty());
917        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
918        assert_eq!(
919            metric.name, "test_histogram",
920            "View rename should be ignored and original name retained."
921        );
922        assert_eq!(
923            metric.unit, "test_unit",
924            "View rename of unit should be ignored and original unit retained."
925        );
926    }
927
928    #[cfg(feature = "spec_unstable_metrics_views")]
929    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
930    #[ignore = "Spatial aggregation is not yet implemented."]
931    async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
932        // cargo test metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing
933
934        // Arrange
935        let exporter = InMemoryMetricExporter::default();
936        // View drops all attributes.
937        let view = |i: &Instrument| {
938            if i.name == "my_observable_counter" {
939                Stream::builder()
940                    .with_allowed_attribute_keys(vec![])
941                    .build()
942                    .ok()
943            } else {
944                None
945            }
946        };
947        let meter_provider = SdkMeterProvider::builder()
948            .with_periodic_exporter(exporter.clone())
949            .with_view(view)
950            .build();
951
952        // Act
953        let meter = meter_provider.meter("test");
954        let _observable_counter = meter
955            .u64_observable_counter("my_observable_counter")
956            .with_callback(|observer| {
957                observer.observe(
958                    100,
959                    &[
960                        KeyValue::new("statusCode", "200"),
961                        KeyValue::new("verb", "get"),
962                    ],
963                );
964
965                observer.observe(
966                    100,
967                    &[
968                        KeyValue::new("statusCode", "200"),
969                        KeyValue::new("verb", "post"),
970                    ],
971                );
972
973                observer.observe(
974                    100,
975                    &[
976                        KeyValue::new("statusCode", "500"),
977                        KeyValue::new("verb", "get"),
978                    ],
979                );
980            })
981            .build();
982
983        meter_provider.force_flush().unwrap();
984
985        // Assert
986        let resource_metrics = exporter
987            .get_finished_metrics()
988            .expect("metrics are expected to be exported.");
989        assert!(!resource_metrics.is_empty());
990        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
991        assert_eq!(metric.name, "my_observable_counter",);
992
993        let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
994            .expect("Sum aggregation expected for ObservableCounter instruments by default")
995        else {
996            unreachable!()
997        };
998
999        // Expecting 1 time-series only, as the view drops all attributes resulting
1000        // in a single time-series.
1001        // This is failing today, due to lack of support for spatial aggregation.
1002        assert_eq!(sum.data_points.len(), 1);
1003
1004        // find and validate the single datapoint
1005        let data_point = &sum.data_points[0];
1006        assert_eq!(data_point.value, 300);
1007    }
1008
1009    #[cfg(feature = "spec_unstable_metrics_views")]
1010    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1011    async fn spatial_aggregation_when_view_drops_attributes_counter() {
1012        // cargo test spatial_aggregation_when_view_drops_attributes_counter --features=testing
1013
1014        // Arrange
1015        let exporter = InMemoryMetricExporter::default();
1016        // View drops all attributes.
1017        let view = |i: &Instrument| {
1018            if i.name == "my_counter" {
1019                Some(
1020                    Stream::builder()
1021                        .with_allowed_attribute_keys(vec![])
1022                        .build()
1023                        .unwrap(),
1024                )
1025            } else {
1026                None
1027            }
1028        };
1029        let meter_provider = SdkMeterProvider::builder()
1030            .with_periodic_exporter(exporter.clone())
1031            .with_view(view)
1032            .build();
1033
1034        // Act
1035        let meter = meter_provider.meter("test");
1036        let counter = meter.u64_counter("my_counter").build();
1037
1038        // Normally, this would generate 3 time-series, but since the view
1039        // drops all attributes, we expect only 1 time-series.
1040        counter.add(
1041            10,
1042            [
1043                KeyValue::new("statusCode", "200"),
1044                KeyValue::new("verb", "Get"),
1045            ]
1046            .as_ref(),
1047        );
1048
1049        counter.add(
1050            10,
1051            [
1052                KeyValue::new("statusCode", "500"),
1053                KeyValue::new("verb", "Get"),
1054            ]
1055            .as_ref(),
1056        );
1057
1058        counter.add(
1059            10,
1060            [
1061                KeyValue::new("statusCode", "200"),
1062                KeyValue::new("verb", "Post"),
1063            ]
1064            .as_ref(),
1065        );
1066
1067        meter_provider.force_flush().unwrap();
1068
1069        // Assert
1070        let resource_metrics = exporter
1071            .get_finished_metrics()
1072            .expect("metrics are expected to be exported.");
1073        assert!(!resource_metrics.is_empty());
1074        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1075        assert_eq!(metric.name, "my_counter",);
1076
1077        let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
1078            .expect("Sum aggregation expected for Counter instruments by default")
1079        else {
1080            unreachable!()
1081        };
1082
1083        // Expecting 1 time-series only, as the view drops all attributes resulting
1084        // in a single time-series.
1085        // This is failing today, due to lack of support for spatial aggregation.
1086        assert_eq!(sum.data_points.len(), 1);
1087        // find and validate the single datapoint
1088        let data_point = &sum.data_points[0];
1089        assert_eq!(data_point.value, 30);
1090    }
1091
1092    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1093    async fn no_attr_cumulative_up_down_counter() {
1094        let mut test_context = TestContext::new(Temporality::Cumulative);
1095        let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1096
1097        counter.add(50, &[]);
1098        test_context.flush_metrics();
1099
1100        let MetricData::Sum(sum) =
1101            test_context.get_aggregation::<i64>("my_counter", Some("my_unit"))
1102        else {
1103            unreachable!()
1104        };
1105
1106        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1107        assert!(!sum.is_monotonic, "Should not produce monotonic.");
1108        assert_eq!(
1109            sum.temporality,
1110            Temporality::Cumulative,
1111            "Should produce cumulative"
1112        );
1113
1114        let data_point = &sum.data_points[0];
1115        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1116        assert_eq!(data_point.value, 50, "Unexpected data point value");
1117    }
1118
1119    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1120    async fn no_attr_up_down_counter_always_cumulative() {
1121        let mut test_context = TestContext::new(Temporality::Delta);
1122        let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1123
1124        counter.add(50, &[]);
1125        test_context.flush_metrics();
1126
1127        let MetricData::Sum(sum) =
1128            test_context.get_aggregation::<i64>("my_counter", Some("my_unit"))
1129        else {
1130            unreachable!()
1131        };
1132
1133        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1134        assert!(!sum.is_monotonic, "Should not produce monotonic.");
1135        assert_eq!(
1136            sum.temporality,
1137            Temporality::Cumulative,
1138            "Should produce Cumulative due to UpDownCounter temporality_preference"
1139        );
1140
1141        let data_point = &sum.data_points[0];
1142        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1143        assert_eq!(data_point.value, 50, "Unexpected data point value");
1144    }
1145
1146    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1147    async fn no_attr_cumulative_counter_value_added_after_export() {
1148        let mut test_context = TestContext::new(Temporality::Cumulative);
1149        let counter = test_context.u64_counter("test", "my_counter", None);
1150
1151        counter.add(50, &[]);
1152        test_context.flush_metrics();
1153        let _ = test_context.get_aggregation::<u64>("my_counter", None);
1154        test_context.reset_metrics();
1155
1156        counter.add(5, &[]);
1157        test_context.flush_metrics();
1158        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1159            unreachable!()
1160        };
1161
1162        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1163        assert!(sum.is_monotonic, "Should produce monotonic.");
1164        assert_eq!(
1165            sum.temporality,
1166            Temporality::Cumulative,
1167            "Should produce cumulative"
1168        );
1169
1170        let data_point = &sum.data_points[0];
1171        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1172        assert_eq!(data_point.value, 55, "Unexpected data point value");
1173    }
1174
1175    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1176    async fn no_attr_delta_counter_value_reset_after_export() {
1177        let mut test_context = TestContext::new(Temporality::Delta);
1178        let counter = test_context.u64_counter("test", "my_counter", None);
1179
1180        counter.add(50, &[]);
1181        test_context.flush_metrics();
1182        let _ = test_context.get_aggregation::<u64>("my_counter", None);
1183        test_context.reset_metrics();
1184
1185        counter.add(5, &[]);
1186        test_context.flush_metrics();
1187        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1188            unreachable!()
1189        };
1190
1191        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1192        assert!(sum.is_monotonic, "Should produce monotonic.");
1193        assert_eq!(
1194            sum.temporality,
1195            Temporality::Delta,
1196            "Should produce cumulative"
1197        );
1198
1199        let data_point = &sum.data_points[0];
1200        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1201        assert_eq!(data_point.value, 5, "Unexpected data point value");
1202    }
1203
1204    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1205    async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() {
1206        let mut test_context = TestContext::new(Temporality::Delta);
1207        let counter = test_context.u64_counter("test", "my_counter", None);
1208
1209        counter.add(50, &[]);
1210        test_context.flush_metrics();
1211        let _ = test_context.get_aggregation::<u64>("my_counter", None);
1212        test_context.reset_metrics();
1213
1214        counter.add(50, &[KeyValue::new("a", "b")]);
1215        test_context.flush_metrics();
1216        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1217            unreachable!()
1218        };
1219
1220        let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty());
1221
1222        assert!(
1223            no_attr_data_point.is_none(),
1224            "Expected no data points with no attributes"
1225        );
1226    }
1227
1228    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1229    async fn delta_memory_efficiency_test() {
1230        // Run this test with stdout enabled to see output.
1231        // cargo test delta_memory_efficiency_test --features=testing -- --nocapture
1232
1233        // Arrange
1234        let mut test_context = TestContext::new(Temporality::Delta);
1235        let counter = test_context.u64_counter("test", "my_counter", None);
1236
1237        // Act
1238        counter.add(1, &[KeyValue::new("key1", "value1")]);
1239        counter.add(1, &[KeyValue::new("key1", "value1")]);
1240        counter.add(1, &[KeyValue::new("key1", "value1")]);
1241        counter.add(1, &[KeyValue::new("key1", "value1")]);
1242        counter.add(1, &[KeyValue::new("key1", "value1")]);
1243
1244        counter.add(1, &[KeyValue::new("key1", "value2")]);
1245        counter.add(1, &[KeyValue::new("key1", "value2")]);
1246        counter.add(1, &[KeyValue::new("key1", "value2")]);
1247        test_context.flush_metrics();
1248
1249        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1250            unreachable!()
1251        };
1252
1253        // Expecting 2 time-series.
1254        assert_eq!(sum.data_points.len(), 2);
1255
1256        // find and validate key1=value1 datapoint
1257        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1258            .expect("datapoint with key1=value1 expected");
1259        assert_eq!(data_point1.value, 5);
1260
1261        // find and validate key1=value2 datapoint
1262        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1263            .expect("datapoint with key1=value2 expected");
1264        assert_eq!(data_point1.value, 3);
1265
1266        test_context.exporter.reset();
1267        // flush again, and validate that nothing is flushed
1268        // as delta temporality.
1269        test_context.flush_metrics();
1270
1271        let resource_metrics = test_context
1272            .exporter
1273            .get_finished_metrics()
1274            .expect("metrics are expected to be exported.");
1275        println!("resource_metrics: {:?}", resource_metrics);
1276        assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
1277    }
1278
1279    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1280    async fn counter_multithreaded() {
1281        // Run this test with stdout enabled to see output.
1282        // cargo test counter_multithreaded --features=testing -- --nocapture
1283
1284        counter_multithreaded_aggregation_helper(Temporality::Delta);
1285        counter_multithreaded_aggregation_helper(Temporality::Cumulative);
1286    }
1287
1288    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1289    async fn counter_f64_multithreaded() {
1290        // Run this test with stdout enabled to see output.
1291        // cargo test counter_f64_multithreaded --features=testing -- --nocapture
1292
1293        counter_f64_multithreaded_aggregation_helper(Temporality::Delta);
1294        counter_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1295    }
1296
1297    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1298    async fn histogram_multithreaded() {
1299        // Run this test with stdout enabled to see output.
1300        // cargo test histogram_multithreaded --features=testing -- --nocapture
1301
1302        histogram_multithreaded_aggregation_helper(Temporality::Delta);
1303        histogram_multithreaded_aggregation_helper(Temporality::Cumulative);
1304    }
1305
1306    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1307    async fn histogram_f64_multithreaded() {
1308        // Run this test with stdout enabled to see output.
1309        // cargo test histogram_f64_multithreaded --features=testing -- --nocapture
1310
1311        histogram_f64_multithreaded_aggregation_helper(Temporality::Delta);
1312        histogram_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1313    }
1314    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1315    async fn synchronous_instruments_cumulative_with_gap_in_measurements() {
1316        // Run this test with stdout enabled to see output.
1317        // cargo test synchronous_instruments_cumulative_with_gap_in_measurements --features=testing -- --nocapture
1318
1319        synchronous_instruments_cumulative_with_gap_in_measurements_helper("counter");
1320        synchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter");
1321        synchronous_instruments_cumulative_with_gap_in_measurements_helper("histogram");
1322        synchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
1323    }
1324
1325    fn synchronous_instruments_cumulative_with_gap_in_measurements_helper(
1326        instrument_name: &'static str,
1327    ) {
1328        let mut test_context = TestContext::new(Temporality::Cumulative);
1329        let attributes = &[KeyValue::new("key1", "value1")];
1330
1331        // Create instrument and emit measurements
1332        match instrument_name {
1333            "counter" => {
1334                let counter = test_context.meter().u64_counter("test_counter").build();
1335                counter.add(5, &[]);
1336                counter.add(10, attributes);
1337            }
1338            "updown_counter" => {
1339                let updown_counter = test_context
1340                    .meter()
1341                    .i64_up_down_counter("test_updowncounter")
1342                    .build();
1343                updown_counter.add(15, &[]);
1344                updown_counter.add(20, attributes);
1345            }
1346            "histogram" => {
1347                let histogram = test_context.meter().u64_histogram("test_histogram").build();
1348                histogram.record(25, &[]);
1349                histogram.record(30, attributes);
1350            }
1351            "gauge" => {
1352                let gauge = test_context.meter().u64_gauge("test_gauge").build();
1353                gauge.record(35, &[]);
1354                gauge.record(40, attributes);
1355            }
1356            _ => panic!("Incorrect instrument kind provided"),
1357        };
1358
1359        test_context.flush_metrics();
1360
1361        // Test the first export
1362        assert_correct_export(&mut test_context, instrument_name);
1363
1364        // Reset and export again without making any measurements
1365        test_context.reset_metrics();
1366
1367        test_context.flush_metrics();
1368
1369        // Test that latest export has the same data as the previous one
1370        assert_correct_export(&mut test_context, instrument_name);
1371
1372        fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1373            match instrument_name {
1374                "counter" => {
1375                    let MetricData::Sum(sum) =
1376                        test_context.get_aggregation::<u64>("test_counter", None)
1377                    else {
1378                        unreachable!()
1379                    };
1380                    assert_eq!(sum.data_points.len(), 2);
1381                    let zero_attribute_datapoint =
1382                        find_sum_datapoint_with_no_attributes(&sum.data_points)
1383                            .expect("datapoint with no attributes expected");
1384                    assert_eq!(zero_attribute_datapoint.value, 5);
1385                    let data_point1 =
1386                        find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1387                            .expect("datapoint with key1=value1 expected");
1388                    assert_eq!(data_point1.value, 10);
1389                }
1390                "updown_counter" => {
1391                    let MetricData::Sum(sum) =
1392                        test_context.get_aggregation::<i64>("test_updowncounter", None)
1393                    else {
1394                        unreachable!()
1395                    };
1396                    assert_eq!(sum.data_points.len(), 2);
1397                    let zero_attribute_datapoint =
1398                        find_sum_datapoint_with_no_attributes(&sum.data_points)
1399                            .expect("datapoint with no attributes expected");
1400                    assert_eq!(zero_attribute_datapoint.value, 15);
1401                    let data_point1 =
1402                        find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1403                            .expect("datapoint with key1=value1 expected");
1404                    assert_eq!(data_point1.value, 20);
1405                }
1406                "histogram" => {
1407                    let MetricData::Histogram(histogram_data) =
1408                        test_context.get_aggregation::<u64>("test_histogram", None)
1409                    else {
1410                        unreachable!()
1411                    };
1412                    assert_eq!(histogram_data.data_points.len(), 2);
1413                    let zero_attribute_datapoint =
1414                        find_histogram_datapoint_with_no_attributes(&histogram_data.data_points)
1415                            .expect("datapoint with no attributes expected");
1416                    assert_eq!(zero_attribute_datapoint.count, 1);
1417                    assert_eq!(zero_attribute_datapoint.sum, 25);
1418                    assert_eq!(zero_attribute_datapoint.min, Some(25));
1419                    assert_eq!(zero_attribute_datapoint.max, Some(25));
1420                    let data_point1 = find_histogram_datapoint_with_key_value(
1421                        &histogram_data.data_points,
1422                        "key1",
1423                        "value1",
1424                    )
1425                    .expect("datapoint with key1=value1 expected");
1426                    assert_eq!(data_point1.count, 1);
1427                    assert_eq!(data_point1.sum, 30);
1428                    assert_eq!(data_point1.min, Some(30));
1429                    assert_eq!(data_point1.max, Some(30));
1430                }
1431                "gauge" => {
1432                    let MetricData::Gauge(gauge_data) =
1433                        test_context.get_aggregation::<u64>("test_gauge", None)
1434                    else {
1435                        unreachable!()
1436                    };
1437                    assert_eq!(gauge_data.data_points.len(), 2);
1438                    let zero_attribute_datapoint =
1439                        find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1440                            .expect("datapoint with no attributes expected");
1441                    assert_eq!(zero_attribute_datapoint.value, 35);
1442                    let data_point1 = find_gauge_datapoint_with_key_value(
1443                        &gauge_data.data_points,
1444                        "key1",
1445                        "value1",
1446                    )
1447                    .expect("datapoint with key1=value1 expected");
1448                    assert_eq!(data_point1.value, 40);
1449                }
1450                _ => panic!("Incorrect instrument kind provided"),
1451            }
1452        }
1453    }
1454
1455    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1456    async fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement() {
1457        // Run this test with stdout enabled to see output.
1458        // cargo test asynchronous_instruments_cumulative_data_points_only_from_last_measurement --features=testing -- --nocapture
1459
1460        asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1461            "gauge", true,
1462        );
1463        // TODO fix: all asynchronous instruments should not emit data points if not measured
1464        // but these implementations are still buggy
1465        asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1466            "counter", false,
1467        );
1468        asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1469            "updown_counter",
1470            false,
1471        );
1472    }
1473
1474    #[test]
1475    fn view_test_rename() {
1476        test_view_customization(
1477            |i| {
1478                if i.name == "my_counter" {
1479                    Some(
1480                        Stream::builder()
1481                            .with_name("my_counter_renamed")
1482                            .build()
1483                            .unwrap(),
1484                    )
1485                } else {
1486                    None
1487                }
1488            },
1489            "my_counter_renamed",
1490            "my_unit",
1491            "my_description",
1492        )
1493    }
1494
1495    #[test]
1496    fn view_test_change_unit() {
1497        test_view_customization(
1498            |i| {
1499                if i.name == "my_counter" {
1500                    Some(Stream::builder().with_unit("my_unit_new").build().unwrap())
1501                } else {
1502                    None
1503                }
1504            },
1505            "my_counter",
1506            "my_unit_new",
1507            "my_description",
1508        )
1509    }
1510
1511    #[test]
1512    fn view_test_change_description() {
1513        test_view_customization(
1514            |i| {
1515                if i.name == "my_counter" {
1516                    Some(
1517                        Stream::builder()
1518                            .with_description("my_description_new")
1519                            .build()
1520                            .unwrap(),
1521                    )
1522                } else {
1523                    None
1524                }
1525            },
1526            "my_counter",
1527            "my_unit",
1528            "my_description_new",
1529        )
1530    }
1531
1532    #[test]
1533    fn view_test_change_name_unit() {
1534        test_view_customization(
1535            |i| {
1536                if i.name == "my_counter" {
1537                    Some(
1538                        Stream::builder()
1539                            .with_name("my_counter_renamed")
1540                            .with_unit("my_unit_new")
1541                            .build()
1542                            .unwrap(),
1543                    )
1544                } else {
1545                    None
1546                }
1547            },
1548            "my_counter_renamed",
1549            "my_unit_new",
1550            "my_description",
1551        )
1552    }
1553
1554    #[test]
1555    fn view_test_change_name_unit_desc() {
1556        test_view_customization(
1557            |i| {
1558                if i.name == "my_counter" {
1559                    Some(
1560                        Stream::builder()
1561                            .with_name("my_counter_renamed")
1562                            .with_unit("my_unit_new")
1563                            .with_description("my_description_new")
1564                            .build()
1565                            .unwrap(),
1566                    )
1567                } else {
1568                    None
1569                }
1570            },
1571            "my_counter_renamed",
1572            "my_unit_new",
1573            "my_description_new",
1574        )
1575    }
1576
1577    #[test]
1578    fn view_test_match_unit() {
1579        test_view_customization(
1580            |i| {
1581                if i.unit == "my_unit" {
1582                    Some(Stream::builder().with_unit("my_unit_new").build().unwrap())
1583                } else {
1584                    None
1585                }
1586            },
1587            "my_counter",
1588            "my_unit_new",
1589            "my_description",
1590        )
1591    }
1592
1593    #[test]
1594    fn view_test_match_none() {
1595        test_view_customization(
1596            |i| {
1597                if i.name == "not_expected_to_match" {
1598                    Some(Stream::builder().build().unwrap())
1599                } else {
1600                    None
1601                }
1602            },
1603            "my_counter",
1604            "my_unit",
1605            "my_description",
1606        )
1607    }
1608
1609    #[test]
1610    fn view_test_match_multiple() {
1611        test_view_customization(
1612            |i| {
1613                if i.name == "my_counter" && i.unit == "my_unit" {
1614                    Some(
1615                        Stream::builder()
1616                            .with_name("my_counter_renamed")
1617                            .build()
1618                            .unwrap(),
1619                    )
1620                } else {
1621                    None
1622                }
1623            },
1624            "my_counter_renamed",
1625            "my_unit",
1626            "my_description",
1627        )
1628    }
1629
1630    /// Helper function to test view customizations
1631    fn test_view_customization<F>(
1632        view_function: F,
1633        expected_name: &str,
1634        expected_unit: &str,
1635        expected_description: &str,
1636    ) where
1637        F: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
1638    {
1639        // Run this test with stdout enabled to see output.
1640        // cargo test view_test_* --all-features -- --nocapture
1641
1642        // Arrange
1643        let exporter = InMemoryMetricExporter::default();
1644        let meter_provider = SdkMeterProvider::builder()
1645            .with_periodic_exporter(exporter.clone())
1646            .with_view(view_function)
1647            .build();
1648
1649        // Act
1650        let meter = meter_provider.meter("test");
1651        let counter = meter
1652            .f64_counter("my_counter")
1653            .with_unit("my_unit")
1654            .with_description("my_description")
1655            .build();
1656
1657        counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1658        meter_provider.force_flush().unwrap();
1659
1660        // Assert
1661        let resource_metrics = exporter
1662            .get_finished_metrics()
1663            .expect("metrics are expected to be exported.");
1664        assert_eq!(resource_metrics.len(), 1);
1665        assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1666        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1667        assert_eq!(
1668            metric.name, expected_name,
1669            "Expected name: {}.",
1670            expected_name
1671        );
1672        assert_eq!(
1673            metric.unit, expected_unit,
1674            "Expected unit: {}.",
1675            expected_unit
1676        );
1677        assert_eq!(
1678            metric.description, expected_description,
1679            "Expected description: {}.",
1680            expected_description
1681        );
1682    }
1683
1684    // Following are just a basic set of advanced View tests - Views bring a lot
1685    // of permutations and combinations, and we need
1686    // to expand coverage for more scenarios in future.
1687    // It is best to first split this file into multiple files
1688    // based on scenarios (eg: regular aggregation, cardinality, views, view_advanced, etc)
1689    // and then add more tests for each of the scenarios.
1690    #[test]
1691    fn test_view_single_instrument_multiple_stream() {
1692        // Run this test with stdout enabled to see output.
1693        // cargo test test_view_multiple_stream --all-features
1694
1695        // Each of the views match the instrument name "my_counter" and create a
1696        // new stream with a different name. In other words, View can be used to
1697        // create multiple streams for the same instrument.
1698
1699        let view1 = |i: &Instrument| {
1700            if i.name() == "my_counter" {
1701                Some(Stream::builder().with_name("my_counter_1").build().unwrap())
1702            } else {
1703                None
1704            }
1705        };
1706
1707        let view2 = |i: &Instrument| {
1708            if i.name() == "my_counter" {
1709                Some(Stream::builder().with_name("my_counter_2").build().unwrap())
1710            } else {
1711                None
1712            }
1713        };
1714
1715        // Arrange
1716        let exporter = InMemoryMetricExporter::default();
1717        let meter_provider = SdkMeterProvider::builder()
1718            .with_periodic_exporter(exporter.clone())
1719            .with_view(view1)
1720            .with_view(view2)
1721            .build();
1722
1723        // Act
1724        let meter = meter_provider.meter("test");
1725        let counter = meter.f64_counter("my_counter").build();
1726
1727        counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1728        meter_provider.force_flush().unwrap();
1729
1730        // Assert
1731        let resource_metrics = exporter
1732            .get_finished_metrics()
1733            .expect("metrics are expected to be exported.");
1734        assert_eq!(resource_metrics.len(), 1);
1735        assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1736        let metrics = &resource_metrics[0].scope_metrics[0].metrics;
1737        assert_eq!(metrics.len(), 2);
1738        assert_eq!(metrics[0].name, "my_counter_1");
1739        assert_eq!(metrics[1].name, "my_counter_2");
1740    }
1741
1742    #[test]
1743    fn test_view_multiple_instrument_single_stream() {
1744        // Run this test with stdout enabled to see output.
1745        // cargo test test_view_multiple_instrument_single_stream --all-features
1746
1747        // The view matches the instrument name "my_counter1" and "my_counter1"
1748        // and create a single new stream for both. In other words, View can be used to
1749        // "merge" multiple instruments into a single stream.
1750        let view = |i: &Instrument| {
1751            if i.name() == "my_counter1" || i.name() == "my_counter2" {
1752                Some(Stream::builder().with_name("my_counter").build().unwrap())
1753            } else {
1754                None
1755            }
1756        };
1757
1758        // Arrange
1759        let exporter = InMemoryMetricExporter::default();
1760        let meter_provider = SdkMeterProvider::builder()
1761            .with_periodic_exporter(exporter.clone())
1762            .with_view(view)
1763            .build();
1764
1765        // Act
1766        let meter = meter_provider.meter("test");
1767        let counter1 = meter.f64_counter("my_counter1").build();
1768        let counter2 = meter.f64_counter("my_counter2").build();
1769
1770        counter1.add(1.5, &[KeyValue::new("key1", "value1")]);
1771        counter2.add(1.5, &[KeyValue::new("key1", "value1")]);
1772        meter_provider.force_flush().unwrap();
1773
1774        // Assert
1775        let resource_metrics = exporter
1776            .get_finished_metrics()
1777            .expect("metrics are expected to be exported.");
1778        assert_eq!(resource_metrics.len(), 1);
1779        assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1780        let metrics = &resource_metrics[0].scope_metrics[0].metrics;
1781        assert_eq!(metrics.len(), 1);
1782        assert_eq!(metrics[0].name, "my_counter");
1783        // TODO: Assert that the data points are aggregated correctly.
1784    }
1785
1786    fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1787        instrument_name: &'static str,
1788        should_not_emit: bool,
1789    ) {
1790        let mut test_context = TestContext::new(Temporality::Cumulative);
1791        let attributes = Arc::new([KeyValue::new("key1", "value1")]);
1792
1793        // Create instrument and emit measurements once
1794        match instrument_name {
1795            "counter" => {
1796                let has_run = AtomicBool::new(false);
1797                let _observable_counter = test_context
1798                    .meter()
1799                    .u64_observable_counter("test_counter")
1800                    .with_callback(move |observer| {
1801                        if !has_run.load(Ordering::SeqCst) {
1802                            observer.observe(5, &[]);
1803                            observer.observe(10, &*attributes.clone());
1804                            has_run.store(true, Ordering::SeqCst);
1805                        }
1806                    })
1807                    .build();
1808            }
1809            "updown_counter" => {
1810                let has_run = AtomicBool::new(false);
1811                let _observable_up_down_counter = test_context
1812                    .meter()
1813                    .i64_observable_up_down_counter("test_updowncounter")
1814                    .with_callback(move |observer| {
1815                        if !has_run.load(Ordering::SeqCst) {
1816                            observer.observe(15, &[]);
1817                            observer.observe(20, &*attributes.clone());
1818                            has_run.store(true, Ordering::SeqCst);
1819                        }
1820                    })
1821                    .build();
1822            }
1823            "gauge" => {
1824                let has_run = AtomicBool::new(false);
1825                let _observable_gauge = test_context
1826                    .meter()
1827                    .u64_observable_gauge("test_gauge")
1828                    .with_callback(move |observer| {
1829                        if !has_run.load(Ordering::SeqCst) {
1830                            observer.observe(25, &[]);
1831                            observer.observe(30, &*attributes.clone());
1832                            has_run.store(true, Ordering::SeqCst);
1833                        }
1834                    })
1835                    .build();
1836            }
1837            _ => panic!("Incorrect instrument kind provided"),
1838        };
1839
1840        test_context.flush_metrics();
1841
1842        // Test the first export
1843        assert_correct_export(&mut test_context, instrument_name);
1844
1845        // Reset and export again without making any measurements
1846        test_context.reset_metrics();
1847
1848        test_context.flush_metrics();
1849
1850        if should_not_emit {
1851            test_context.check_no_metrics();
1852        } else {
1853            // Test that latest export has the same data as the previous one
1854            assert_correct_export(&mut test_context, instrument_name);
1855        }
1856
1857        fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1858            match instrument_name {
1859                "counter" => {
1860                    let MetricData::Sum(sum) =
1861                        test_context.get_aggregation::<u64>("test_counter", None)
1862                    else {
1863                        unreachable!()
1864                    };
1865                    assert_eq!(sum.data_points.len(), 2);
1866                    assert!(sum.is_monotonic);
1867                    let zero_attribute_datapoint =
1868                        find_sum_datapoint_with_no_attributes(&sum.data_points)
1869                            .expect("datapoint with no attributes expected");
1870                    assert_eq!(zero_attribute_datapoint.value, 5);
1871                    let data_point1 =
1872                        find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1873                            .expect("datapoint with key1=value1 expected");
1874                    assert_eq!(data_point1.value, 10);
1875                }
1876                "updown_counter" => {
1877                    let MetricData::Sum(sum) =
1878                        test_context.get_aggregation::<i64>("test_updowncounter", None)
1879                    else {
1880                        unreachable!()
1881                    };
1882                    assert_eq!(sum.data_points.len(), 2);
1883                    assert!(!sum.is_monotonic);
1884                    let zero_attribute_datapoint =
1885                        find_sum_datapoint_with_no_attributes(&sum.data_points)
1886                            .expect("datapoint with no attributes expected");
1887                    assert_eq!(zero_attribute_datapoint.value, 15);
1888                    let data_point1 =
1889                        find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1890                            .expect("datapoint with key1=value1 expected");
1891                    assert_eq!(data_point1.value, 20);
1892                }
1893                "gauge" => {
1894                    let MetricData::Gauge(gauge_data) =
1895                        test_context.get_aggregation::<u64>("test_gauge", None)
1896                    else {
1897                        unreachable!()
1898                    };
1899                    assert_eq!(gauge_data.data_points.len(), 2);
1900                    let zero_attribute_datapoint =
1901                        find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1902                            .expect("datapoint with no attributes expected");
1903                    assert_eq!(zero_attribute_datapoint.value, 25);
1904                    let data_point1 = find_gauge_datapoint_with_key_value(
1905                        &gauge_data.data_points,
1906                        "key1",
1907                        "value1",
1908                    )
1909                    .expect("datapoint with key1=value1 expected");
1910                    assert_eq!(data_point1.value, 30);
1911                }
1912                _ => panic!("Incorrect instrument kind provided"),
1913            }
1914        }
1915    }
1916
1917    fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
1918        // Arrange
1919        let mut test_context = TestContext::new(temporality);
1920        let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));
1921
1922        for i in 0..10 {
1923            thread::scope(|s| {
1924                s.spawn(|| {
1925                    counter.add(1, &[]);
1926
1927                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1928                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1929                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1930
1931                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1932                    if i % 2 == 0 {
1933                        test_context.flush_metrics();
1934                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
1935                    }
1936
1937                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1938                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1939                });
1940            });
1941        }
1942
1943        test_context.flush_metrics();
1944
1945        // Assert
1946        // We invoke `test_context.flush_metrics()` six times.
1947        let sums = test_context
1948            .get_from_multiple_aggregations::<u64>("my_counter", None, 6)
1949            .into_iter()
1950            .map(|data| {
1951                if let MetricData::Sum(sum) = data {
1952                    sum
1953                } else {
1954                    unreachable!()
1955                }
1956            })
1957            .collect::<Vec<_>>();
1958
1959        let mut sum_zero_attributes = 0;
1960        let mut sum_key1_value1 = 0;
1961        sums.iter().for_each(|sum| {
1962            assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series.
1963            assert!(sum.is_monotonic, "Counter should produce monotonic.");
1964            assert_eq!(sum.temporality, temporality);
1965
1966            if temporality == Temporality::Delta {
1967                sum_zero_attributes += sum.data_points[0].value;
1968                sum_key1_value1 += sum.data_points[1].value;
1969            } else {
1970                sum_zero_attributes = sum.data_points[0].value;
1971                sum_key1_value1 = sum.data_points[1].value;
1972            };
1973        });
1974
1975        assert_eq!(sum_zero_attributes, 10);
1976        assert_eq!(sum_key1_value1, 50); // Each of the 10 update threads record measurements summing up to 5.
1977    }
1978
1979    fn counter_f64_multithreaded_aggregation_helper(temporality: Temporality) {
1980        // Arrange
1981        let mut test_context = TestContext::new(temporality);
1982        let counter = Arc::new(test_context.meter().f64_counter("test_counter").build());
1983
1984        for i in 0..10 {
1985            thread::scope(|s| {
1986                s.spawn(|| {
1987                    counter.add(1.23, &[]);
1988
1989                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1990                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1991                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1992
1993                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1994                    if i % 2 == 0 {
1995                        test_context.flush_metrics();
1996                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
1997                    }
1998
1999                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
2000                    counter.add(1.23, &[KeyValue::new("key1", "value1")]);
2001                });
2002            });
2003        }
2004
2005        test_context.flush_metrics();
2006
2007        // Assert
2008        // We invoke `test_context.flush_metrics()` six times.
2009        let sums = test_context
2010            .get_from_multiple_aggregations::<f64>("test_counter", None, 6)
2011            .into_iter()
2012            .map(|data| {
2013                if let MetricData::Sum(sum) = data {
2014                    sum
2015                } else {
2016                    unreachable!()
2017                }
2018            })
2019            .collect::<Vec<_>>();
2020
2021        let mut sum_zero_attributes = 0.0;
2022        let mut sum_key1_value1 = 0.0;
2023        sums.iter().for_each(|sum| {
2024            assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series.
2025            assert!(sum.is_monotonic, "Counter should produce monotonic.");
2026            assert_eq!(sum.temporality, temporality);
2027
2028            if temporality == Temporality::Delta {
2029                sum_zero_attributes += sum.data_points[0].value;
2030                sum_key1_value1 += sum.data_points[1].value;
2031            } else {
2032                sum_zero_attributes = sum.data_points[0].value;
2033                sum_key1_value1 = sum.data_points[1].value;
2034            };
2035        });
2036
2037        assert!(f64::abs(12.3 - sum_zero_attributes) < 0.0001);
2038        assert!(f64::abs(61.5 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements 5 times = 10 * 5 * 1.23 = 61.5
2039    }
2040
2041    fn histogram_multithreaded_aggregation_helper(temporality: Temporality) {
2042        // Arrange
2043        let mut test_context = TestContext::new(temporality);
2044        let histogram = Arc::new(test_context.meter().u64_histogram("test_histogram").build());
2045
2046        for i in 0..10 {
2047            thread::scope(|s| {
2048                s.spawn(|| {
2049                    histogram.record(1, &[]);
2050                    histogram.record(4, &[]);
2051
2052                    histogram.record(5, &[KeyValue::new("key1", "value1")]);
2053                    histogram.record(7, &[KeyValue::new("key1", "value1")]);
2054                    histogram.record(18, &[KeyValue::new("key1", "value1")]);
2055
2056                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
2057                    if i % 2 == 0 {
2058                        test_context.flush_metrics();
2059                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
2060                    }
2061
2062                    histogram.record(35, &[KeyValue::new("key1", "value1")]);
2063                    histogram.record(35, &[KeyValue::new("key1", "value1")]);
2064                });
2065            });
2066        }
2067
2068        test_context.flush_metrics();
2069
2070        // Assert
2071        // We invoke `test_context.flush_metrics()` six times.
2072        let histograms = test_context
2073            .get_from_multiple_aggregations::<u64>("test_histogram", None, 6)
2074            .into_iter()
2075            .map(|data| {
2076                if let MetricData::Histogram(hist) = data {
2077                    hist
2078                } else {
2079                    unreachable!()
2080                }
2081            })
2082            .collect::<Vec<_>>();
2083
2084        let (
2085            mut sum_zero_attributes,
2086            mut count_zero_attributes,
2087            mut min_zero_attributes,
2088            mut max_zero_attributes,
2089        ) = (0, 0, u64::MAX, u64::MIN);
2090        let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
2091            (0, 0, u64::MAX, u64::MIN);
2092
2093        let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration
2094        let mut bucket_counts_key1_value1 = vec![0; 16];
2095
2096        histograms.iter().for_each(|histogram| {
2097            assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series.
2098            assert_eq!(histogram.temporality, temporality);
2099
2100            let data_point_zero_attributes =
2101                find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
2102            let data_point_key1_value1 =
2103                find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
2104                    .unwrap();
2105
2106            if temporality == Temporality::Delta {
2107                sum_zero_attributes += data_point_zero_attributes.sum;
2108                sum_key1_value1 += data_point_key1_value1.sum;
2109
2110                count_zero_attributes += data_point_zero_attributes.count;
2111                count_key1_value1 += data_point_key1_value1.count;
2112
2113                min_zero_attributes =
2114                    min(min_zero_attributes, data_point_zero_attributes.min.unwrap());
2115                min_key1_value1 = min(min_key1_value1, data_point_key1_value1.min.unwrap());
2116
2117                max_zero_attributes =
2118                    max(max_zero_attributes, data_point_zero_attributes.max.unwrap());
2119                max_key1_value1 = max(max_key1_value1, data_point_key1_value1.max.unwrap());
2120
2121                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2122                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2123
2124                for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
2125                    bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
2126                }
2127
2128                for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
2129                    bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
2130                }
2131            } else {
2132                sum_zero_attributes = data_point_zero_attributes.sum;
2133                sum_key1_value1 = data_point_key1_value1.sum;
2134
2135                count_zero_attributes = data_point_zero_attributes.count;
2136                count_key1_value1 = data_point_key1_value1.count;
2137
2138                min_zero_attributes = data_point_zero_attributes.min.unwrap();
2139                min_key1_value1 = data_point_key1_value1.min.unwrap();
2140
2141                max_zero_attributes = data_point_zero_attributes.max.unwrap();
2142                max_key1_value1 = data_point_key1_value1.max.unwrap();
2143
2144                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2145                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2146
2147                bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
2148                bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
2149            };
2150        });
2151
2152        // Default buckets:
2153        // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
2154        // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞).
2155
2156        assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements.
2157        assert_eq!(sum_zero_attributes, 50); // Each of the 10 update threads record measurements summing up to 5.
2158        assert_eq!(min_zero_attributes, 1);
2159        assert_eq!(max_zero_attributes, 4);
2160
2161        for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
2162            match i {
2163                1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1 and 4 fall under the bucket (0, 5].
2164                _ => assert_eq!(*count, 0),
2165            }
2166        }
2167
2168        assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements.
2169        assert_eq!(sum_key1_value1, 1000); // Each of the 10 update threads record measurements summing up to 100 (5 + 7 + 18 + 35 + 35).
2170        assert_eq!(min_key1_value1, 5);
2171        assert_eq!(max_key1_value1, 35);
2172
2173        for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
2174            match i {
2175                1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5 falls under the bucket (0, 5].
2176                2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7 falls under the bucket (5, 10].
2177                3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18 falls under the bucket (10, 25].
2178                4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35 (recorded twice) falls under the bucket (25, 50].
2179                _ => assert_eq!(*count, 0),
2180            }
2181        }
2182    }
2183
2184    fn histogram_f64_multithreaded_aggregation_helper(temporality: Temporality) {
2185        // Arrange
2186        let mut test_context = TestContext::new(temporality);
2187        let histogram = Arc::new(test_context.meter().f64_histogram("test_histogram").build());
2188
2189        for i in 0..10 {
2190            thread::scope(|s| {
2191                s.spawn(|| {
2192                    histogram.record(1.5, &[]);
2193                    histogram.record(4.6, &[]);
2194
2195                    histogram.record(5.0, &[KeyValue::new("key1", "value1")]);
2196                    histogram.record(7.3, &[KeyValue::new("key1", "value1")]);
2197                    histogram.record(18.1, &[KeyValue::new("key1", "value1")]);
2198
2199                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
2200                    if i % 2 == 0 {
2201                        test_context.flush_metrics();
2202                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
2203                    }
2204
2205                    histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
2206                    histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
2207                });
2208            });
2209        }
2210
2211        test_context.flush_metrics();
2212
2213        // Assert
2214        // We invoke `test_context.flush_metrics()` six times.
2215        let histograms = test_context
2216            .get_from_multiple_aggregations::<f64>("test_histogram", None, 6)
2217            .into_iter()
2218            .map(|data| {
2219                if let MetricData::Histogram(hist) = data {
2220                    hist
2221                } else {
2222                    unreachable!()
2223                }
2224            })
2225            .collect::<Vec<_>>();
2226
2227        let (
2228            mut sum_zero_attributes,
2229            mut count_zero_attributes,
2230            mut min_zero_attributes,
2231            mut max_zero_attributes,
2232        ) = (0.0, 0, f64::MAX, f64::MIN);
2233        let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
2234            (0.0, 0, f64::MAX, f64::MIN);
2235
2236        let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration
2237        let mut bucket_counts_key1_value1 = vec![0; 16];
2238
2239        histograms.iter().for_each(|histogram| {
2240            assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series.
2241            assert_eq!(histogram.temporality, temporality);
2242
2243            let data_point_zero_attributes =
2244                find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
2245            let data_point_key1_value1 =
2246                find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
2247                    .unwrap();
2248
2249            if temporality == Temporality::Delta {
2250                sum_zero_attributes += data_point_zero_attributes.sum;
2251                sum_key1_value1 += data_point_key1_value1.sum;
2252
2253                count_zero_attributes += data_point_zero_attributes.count;
2254                count_key1_value1 += data_point_key1_value1.count;
2255
2256                min_zero_attributes =
2257                    min_zero_attributes.min(data_point_zero_attributes.min.unwrap());
2258                min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap());
2259
2260                max_zero_attributes =
2261                    max_zero_attributes.max(data_point_zero_attributes.max.unwrap());
2262                max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap());
2263
2264                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2265                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2266
2267                for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
2268                    bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
2269                }
2270
2271                for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
2272                    bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
2273                }
2274            } else {
2275                sum_zero_attributes = data_point_zero_attributes.sum;
2276                sum_key1_value1 = data_point_key1_value1.sum;
2277
2278                count_zero_attributes = data_point_zero_attributes.count;
2279                count_key1_value1 = data_point_key1_value1.count;
2280
2281                min_zero_attributes = data_point_zero_attributes.min.unwrap();
2282                min_key1_value1 = data_point_key1_value1.min.unwrap();
2283
2284                max_zero_attributes = data_point_zero_attributes.max.unwrap();
2285                max_key1_value1 = data_point_key1_value1.max.unwrap();
2286
2287                assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2288                assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2289
2290                bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
2291                bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
2292            };
2293        });
2294
2295        // Default buckets:
2296        // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
2297        // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞).
2298
2299        assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements.
2300        assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); // Each of the 10 update threads record measurements summing up to 6.1 (1.5 + 4.6)
2301        assert_eq!(min_zero_attributes, 1.5);
2302        assert_eq!(max_zero_attributes, 4.6);
2303
2304        for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
2305            match i {
2306                1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1.5 and 4.6 fall under the bucket (0, 5.0].
2307                _ => assert_eq!(*count, 0),
2308            }
2309        }
2310
2311        assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements.
2312        assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements summing up to 100.4 (5.0 + 7.3 + 18.1 + 35.1 + 35.1).
2313        assert_eq!(min_key1_value1, 5.0);
2314        assert_eq!(max_key1_value1, 35.1);
2315
2316        for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
2317            match i {
2318                1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5.0 falls under the bucket (0, 5.0].
2319                2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7.3 falls under the bucket (5.0, 10.0].
2320                3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18.1 falls under the bucket (10.0, 25.0].
2321                4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35.1 (recorded twice) falls under the bucket (25.0, 50.0].
2322                _ => assert_eq!(*count, 0),
2323            }
2324        }
2325    }
2326
2327    fn histogram_aggregation_helper(temporality: Temporality) {
2328        // Arrange
2329        let mut test_context = TestContext::new(temporality);
2330        let histogram = test_context.meter().u64_histogram("my_histogram").build();
2331
2332        // Act
2333        let mut rand = rngs::SmallRng::from_os_rng();
2334        let values_kv1 = (0..50)
2335            .map(|_| rand.random_range(0..100))
2336            .collect::<Vec<u64>>();
2337        for value in values_kv1.iter() {
2338            histogram.record(*value, &[KeyValue::new("key1", "value1")]);
2339        }
2340
2341        let values_kv2 = (0..30)
2342            .map(|_| rand.random_range(0..100))
2343            .collect::<Vec<u64>>();
2344        for value in values_kv2.iter() {
2345            histogram.record(*value, &[KeyValue::new("key1", "value2")]);
2346        }
2347
2348        test_context.flush_metrics();
2349
2350        // Assert
2351        let MetricData::Histogram(histogram_data) =
2352            test_context.get_aggregation::<u64>("my_histogram", None)
2353        else {
2354            unreachable!()
2355        };
2356        // Expecting 2 time-series.
2357        assert_eq!(histogram_data.data_points.len(), 2);
2358        if let Temporality::Cumulative = temporality {
2359            assert_eq!(
2360                histogram_data.temporality,
2361                Temporality::Cumulative,
2362                "Should produce cumulative"
2363            );
2364        } else {
2365            assert_eq!(
2366                histogram_data.temporality,
2367                Temporality::Delta,
2368                "Should produce delta"
2369            );
2370        }
2371
2372        // find and validate key1=value2 datapoint
2373        let data_point1 =
2374            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2375                .expect("datapoint with key1=value1 expected");
2376        assert_eq!(data_point1.count, values_kv1.len() as u64);
2377        assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
2378        assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2379        assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2380
2381        let data_point2 =
2382            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
2383                .expect("datapoint with key1=value2 expected");
2384        assert_eq!(data_point2.count, values_kv2.len() as u64);
2385        assert_eq!(data_point2.sum, values_kv2.iter().sum::<u64>());
2386        assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap());
2387        assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap());
2388
2389        // Reset and report more measurements
2390        test_context.reset_metrics();
2391        for value in values_kv1.iter() {
2392            histogram.record(*value, &[KeyValue::new("key1", "value1")]);
2393        }
2394
2395        for value in values_kv2.iter() {
2396            histogram.record(*value, &[KeyValue::new("key1", "value2")]);
2397        }
2398
2399        test_context.flush_metrics();
2400
2401        let MetricData::Histogram(histogram_data) =
2402            test_context.get_aggregation::<u64>("my_histogram", None)
2403        else {
2404            unreachable!()
2405        };
2406        assert_eq!(histogram_data.data_points.len(), 2);
2407        let data_point1 =
2408            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2409                .expect("datapoint with key1=value1 expected");
2410        if temporality == Temporality::Cumulative {
2411            assert_eq!(data_point1.count, 2 * (values_kv1.len() as u64));
2412            assert_eq!(data_point1.sum, 2 * (values_kv1.iter().sum::<u64>()));
2413            assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2414            assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2415        } else {
2416            assert_eq!(data_point1.count, values_kv1.len() as u64);
2417            assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
2418            assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2419            assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2420        }
2421
2422        let data_point1 =
2423            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
2424                .expect("datapoint with key1=value1 expected");
2425        if temporality == Temporality::Cumulative {
2426            assert_eq!(data_point1.count, 2 * (values_kv2.len() as u64));
2427            assert_eq!(data_point1.sum, 2 * (values_kv2.iter().sum::<u64>()));
2428            assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2429            assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2430        } else {
2431            assert_eq!(data_point1.count, values_kv2.len() as u64);
2432            assert_eq!(data_point1.sum, values_kv2.iter().sum::<u64>());
2433            assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2434            assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2435        }
2436    }
2437
2438    fn histogram_aggregation_with_custom_bounds_helper(temporality: Temporality) {
2439        let mut test_context = TestContext::new(temporality);
2440        let histogram = test_context
2441            .meter()
2442            .u64_histogram("test_histogram")
2443            .with_boundaries(vec![1.0, 2.5, 5.5])
2444            .build();
2445        histogram.record(1, &[KeyValue::new("key1", "value1")]);
2446        histogram.record(2, &[KeyValue::new("key1", "value1")]);
2447        histogram.record(3, &[KeyValue::new("key1", "value1")]);
2448        histogram.record(4, &[KeyValue::new("key1", "value1")]);
2449        histogram.record(5, &[KeyValue::new("key1", "value1")]);
2450
2451        test_context.flush_metrics();
2452
2453        // Assert
2454        let MetricData::Histogram(histogram_data) =
2455            test_context.get_aggregation::<u64>("test_histogram", None)
2456        else {
2457            unreachable!()
2458        };
2459        // Expecting 2 time-series.
2460        assert_eq!(histogram_data.data_points.len(), 1);
2461        if let Temporality::Cumulative = temporality {
2462            assert_eq!(
2463                histogram_data.temporality,
2464                Temporality::Cumulative,
2465                "Should produce cumulative"
2466            );
2467        } else {
2468            assert_eq!(
2469                histogram_data.temporality,
2470                Temporality::Delta,
2471                "Should produce delta"
2472            );
2473        }
2474
2475        // find and validate key1=value1 datapoint
2476        let data_point =
2477            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2478                .expect("datapoint with key1=value1 expected");
2479
2480        assert_eq!(data_point.count, 5);
2481        assert_eq!(data_point.sum, 15);
2482
2483        // Check the bucket counts
2484        // -∞ to 1.0: 1
2485        // 1.0 to 2.5: 1
2486        // 2.5 to 5.5: 3
2487        // 5.5 to +∞: 0
2488
2489        assert_eq!(vec![1.0, 2.5, 5.5], data_point.bounds);
2490        assert_eq!(vec![1, 1, 3, 0], data_point.bucket_counts);
2491    }
2492
2493    fn histogram_aggregation_with_empty_bounds_helper(temporality: Temporality) {
2494        let mut test_context = TestContext::new(temporality);
2495        let histogram = test_context
2496            .meter()
2497            .u64_histogram("test_histogram")
2498            .with_boundaries(vec![])
2499            .build();
2500        histogram.record(1, &[KeyValue::new("key1", "value1")]);
2501        histogram.record(2, &[KeyValue::new("key1", "value1")]);
2502        histogram.record(3, &[KeyValue::new("key1", "value1")]);
2503        histogram.record(4, &[KeyValue::new("key1", "value1")]);
2504        histogram.record(5, &[KeyValue::new("key1", "value1")]);
2505
2506        test_context.flush_metrics();
2507
2508        // Assert
2509        let MetricData::Histogram(histogram_data) =
2510            test_context.get_aggregation::<u64>("test_histogram", None)
2511        else {
2512            unreachable!()
2513        };
2514        // Expecting 1 time-series.
2515        assert_eq!(histogram_data.data_points.len(), 1);
2516        if let Temporality::Cumulative = temporality {
2517            assert_eq!(
2518                histogram_data.temporality,
2519                Temporality::Cumulative,
2520                "Should produce cumulative"
2521            );
2522        } else {
2523            assert_eq!(
2524                histogram_data.temporality,
2525                Temporality::Delta,
2526                "Should produce delta"
2527            );
2528        }
2529
2530        // find and validate key1=value1 datapoint
2531        let data_point =
2532            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2533                .expect("datapoint with key1=value1 expected");
2534
2535        assert_eq!(data_point.count, 5);
2536        assert_eq!(data_point.sum, 15);
2537        assert!(data_point.bounds.is_empty());
2538        assert!(data_point.bucket_counts.is_empty());
2539    }
2540
2541    fn gauge_aggregation_helper(temporality: Temporality) {
2542        // Arrange
2543        let mut test_context = TestContext::new(temporality);
2544        let gauge = test_context.meter().i64_gauge("my_gauge").build();
2545
2546        // Act
2547        gauge.record(1, &[KeyValue::new("key1", "value1")]);
2548        gauge.record(2, &[KeyValue::new("key1", "value1")]);
2549        gauge.record(1, &[KeyValue::new("key1", "value1")]);
2550        gauge.record(3, &[KeyValue::new("key1", "value1")]);
2551        gauge.record(4, &[KeyValue::new("key1", "value1")]);
2552
2553        gauge.record(11, &[KeyValue::new("key1", "value2")]);
2554        gauge.record(13, &[KeyValue::new("key1", "value2")]);
2555        gauge.record(6, &[KeyValue::new("key1", "value2")]);
2556
2557        test_context.flush_metrics();
2558
2559        // Assert
2560        let MetricData::Gauge(gauge_data_point) =
2561            test_context.get_aggregation::<i64>("my_gauge", None)
2562        else {
2563            unreachable!()
2564        };
2565        // Expecting 2 time-series.
2566        assert_eq!(gauge_data_point.data_points.len(), 2);
2567
2568        // find and validate key1=value2 datapoint
2569        let data_point1 =
2570            find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value1")
2571                .expect("datapoint with key1=value1 expected");
2572        assert_eq!(data_point1.value, 4);
2573
2574        let data_point1 =
2575            find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value2")
2576                .expect("datapoint with key1=value2 expected");
2577        assert_eq!(data_point1.value, 6);
2578
2579        // Reset and report more measurements
2580        test_context.reset_metrics();
2581        gauge.record(1, &[KeyValue::new("key1", "value1")]);
2582        gauge.record(2, &[KeyValue::new("key1", "value1")]);
2583        gauge.record(11, &[KeyValue::new("key1", "value1")]);
2584        gauge.record(3, &[KeyValue::new("key1", "value1")]);
2585        gauge.record(41, &[KeyValue::new("key1", "value1")]);
2586
2587        gauge.record(34, &[KeyValue::new("key1", "value2")]);
2588        gauge.record(12, &[KeyValue::new("key1", "value2")]);
2589        gauge.record(54, &[KeyValue::new("key1", "value2")]);
2590
2591        test_context.flush_metrics();
2592
2593        let MetricData::Gauge(gauge) = test_context.get_aggregation::<i64>("my_gauge", None) else {
2594            unreachable!()
2595        };
2596        assert_eq!(gauge.data_points.len(), 2);
2597        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2598            .expect("datapoint with key1=value1 expected");
2599        assert_eq!(data_point1.value, 41);
2600
2601        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value2")
2602            .expect("datapoint with key1=value2 expected");
2603        assert_eq!(data_point1.value, 54);
2604    }
2605
2606    fn observable_gauge_aggregation_helper(temporality: Temporality, use_empty_attributes: bool) {
2607        // Arrange
2608        let mut test_context = TestContext::new(temporality);
2609        let _observable_gauge = test_context
2610            .meter()
2611            .i64_observable_gauge("test_observable_gauge")
2612            .with_callback(move |observer| {
2613                if use_empty_attributes {
2614                    observer.observe(1, &[]);
2615                }
2616                observer.observe(4, &[KeyValue::new("key1", "value1")]);
2617                observer.observe(5, &[KeyValue::new("key2", "value2")]);
2618            })
2619            .build();
2620
2621        test_context.flush_metrics();
2622
2623        // Assert
2624        let MetricData::Gauge(gauge) =
2625            test_context.get_aggregation::<i64>("test_observable_gauge", None)
2626        else {
2627            unreachable!()
2628        };
2629        // Expecting 2 time-series.
2630        let expected_time_series_count = if use_empty_attributes { 3 } else { 2 };
2631        assert_eq!(gauge.data_points.len(), expected_time_series_count);
2632
2633        if use_empty_attributes {
2634            // find and validate zero attribute datapoint
2635            let zero_attribute_datapoint =
2636                find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2637                    .expect("datapoint with no attributes expected");
2638            assert_eq!(zero_attribute_datapoint.value, 1);
2639        }
2640
2641        // find and validate key1=value1 datapoint
2642        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2643            .expect("datapoint with key1=value1 expected");
2644        assert_eq!(data_point1.value, 4);
2645
2646        // find and validate key2=value2 datapoint
2647        let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2648            .expect("datapoint with key2=value2 expected");
2649        assert_eq!(data_point2.value, 5);
2650
2651        // Reset and report more measurements
2652        test_context.reset_metrics();
2653
2654        test_context.flush_metrics();
2655
2656        let MetricData::Gauge(gauge) =
2657            test_context.get_aggregation::<i64>("test_observable_gauge", None)
2658        else {
2659            unreachable!()
2660        };
2661        assert_eq!(gauge.data_points.len(), expected_time_series_count);
2662
2663        if use_empty_attributes {
2664            let zero_attribute_datapoint =
2665                find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2666                    .expect("datapoint with no attributes expected");
2667            assert_eq!(zero_attribute_datapoint.value, 1);
2668        }
2669
2670        let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2671            .expect("datapoint with key1=value1 expected");
2672        assert_eq!(data_point1.value, 4);
2673
2674        let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2675            .expect("datapoint with key2=value2 expected");
2676        assert_eq!(data_point2.value, 5);
2677    }
2678
2679    fn counter_aggregation_helper(temporality: Temporality) {
2680        // Arrange
2681        let mut test_context = TestContext::new(temporality);
2682        let counter = test_context.u64_counter("test", "my_counter", None);
2683
2684        // Act
2685        counter.add(1, &[KeyValue::new("key1", "value1")]);
2686        counter.add(1, &[KeyValue::new("key1", "value1")]);
2687        counter.add(1, &[KeyValue::new("key1", "value1")]);
2688        counter.add(1, &[KeyValue::new("key1", "value1")]);
2689        counter.add(1, &[KeyValue::new("key1", "value1")]);
2690
2691        counter.add(1, &[KeyValue::new("key1", "value2")]);
2692        counter.add(1, &[KeyValue::new("key1", "value2")]);
2693        counter.add(1, &[KeyValue::new("key1", "value2")]);
2694
2695        test_context.flush_metrics();
2696
2697        // Assert
2698        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2699            unreachable!()
2700        };
2701        // Expecting 2 time-series.
2702        assert_eq!(sum.data_points.len(), 2);
2703        assert!(sum.is_monotonic, "Counter should produce monotonic.");
2704        if let Temporality::Cumulative = temporality {
2705            assert_eq!(
2706                sum.temporality,
2707                Temporality::Cumulative,
2708                "Should produce cumulative"
2709            );
2710        } else {
2711            assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
2712        }
2713
2714        // find and validate key1=value2 datapoint
2715        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2716            .expect("datapoint with key1=value1 expected");
2717        assert_eq!(data_point1.value, 5);
2718
2719        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2720            .expect("datapoint with key1=value2 expected");
2721        assert_eq!(data_point1.value, 3);
2722
2723        // Reset and report more measurements
2724        test_context.reset_metrics();
2725        counter.add(1, &[KeyValue::new("key1", "value1")]);
2726        counter.add(1, &[KeyValue::new("key1", "value1")]);
2727        counter.add(1, &[KeyValue::new("key1", "value1")]);
2728        counter.add(1, &[KeyValue::new("key1", "value1")]);
2729        counter.add(1, &[KeyValue::new("key1", "value1")]);
2730
2731        counter.add(1, &[KeyValue::new("key1", "value2")]);
2732        counter.add(1, &[KeyValue::new("key1", "value2")]);
2733        counter.add(1, &[KeyValue::new("key1", "value2")]);
2734
2735        test_context.flush_metrics();
2736
2737        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2738            unreachable!()
2739        };
2740        assert_eq!(sum.data_points.len(), 2);
2741        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2742            .expect("datapoint with key1=value1 expected");
2743        if temporality == Temporality::Cumulative {
2744            assert_eq!(data_point1.value, 10);
2745        } else {
2746            assert_eq!(data_point1.value, 5);
2747        }
2748
2749        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2750            .expect("datapoint with key1=value2 expected");
2751        if temporality == Temporality::Cumulative {
2752            assert_eq!(data_point1.value, 6);
2753        } else {
2754            assert_eq!(data_point1.value, 3);
2755        }
2756    }
2757
2758    fn counter_aggregation_overflow_helper(temporality: Temporality) {
2759        // Arrange
2760        let mut test_context = TestContext::new(temporality);
2761        let counter = test_context.u64_counter("test", "my_counter", None);
2762
2763        // Act
2764        // Record measurements with A:0, A:1,.......A:1999, which just fits in the 2000 limit
2765        for v in 0..2000 {
2766            counter.add(100, &[KeyValue::new("A", v.to_string())]);
2767        }
2768
2769        // Empty attributes is specially treated and does not count towards the limit.
2770        counter.add(3, &[]);
2771        counter.add(3, &[]);
2772
2773        // All of the below will now go into overflow.
2774        counter.add(100, &[KeyValue::new("A", "foo")]);
2775        counter.add(100, &[KeyValue::new("A", "another")]);
2776        counter.add(100, &[KeyValue::new("A", "yet_another")]);
2777        test_context.flush_metrics();
2778
2779        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2780            unreachable!()
2781        };
2782
2783        // Expecting 2002 metric points. (2000 + 1 overflow + Empty attributes)
2784        assert_eq!(sum.data_points.len(), 2002);
2785
2786        let data_point =
2787            find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2788        assert_eq!(data_point.value, 300);
2789
2790        // let empty_attrs_data_point = &sum.data_points[0];
2791        let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2792            .expect("Empty attributes point expected");
2793        assert!(
2794            empty_attrs_data_point.attributes.is_empty(),
2795            "Non-empty attribute set"
2796        );
2797        assert_eq!(
2798            empty_attrs_data_point.value, 6,
2799            "Empty attributes value should be 3+3=6"
2800        );
2801
2802        // Phase 2 - for delta temporality, after each collect, data points are cleared
2803        // but for cumulative, they are not cleared.
2804        test_context.reset_metrics();
2805        // The following should be aggregated normally for Delta,
2806        // and should go into overflow for Cumulative.
2807        counter.add(100, &[KeyValue::new("A", "foo")]);
2808        counter.add(100, &[KeyValue::new("A", "another")]);
2809        counter.add(100, &[KeyValue::new("A", "yet_another")]);
2810        test_context.flush_metrics();
2811
2812        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2813            unreachable!()
2814        };
2815
2816        if temporality == Temporality::Delta {
2817            assert_eq!(sum.data_points.len(), 3);
2818
2819            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo")
2820                .expect("point expected");
2821            assert_eq!(data_point.value, 100);
2822
2823            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another")
2824                .expect("point expected");
2825            assert_eq!(data_point.value, 100);
2826
2827            let data_point =
2828                find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another")
2829                    .expect("point expected");
2830            assert_eq!(data_point.value, 100);
2831        } else {
2832            // For cumulative, overflow should still be there, and new points should not be added.
2833            assert_eq!(sum.data_points.len(), 2002);
2834            let data_point =
2835                find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2836            assert_eq!(data_point.value, 600);
2837
2838            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo");
2839            assert!(data_point.is_none(), "point should not be present");
2840
2841            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another");
2842            assert!(data_point.is_none(), "point should not be present");
2843
2844            let data_point =
2845                find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another");
2846            assert!(data_point.is_none(), "point should not be present");
2847        }
2848    }
2849
2850    fn counter_aggregation_overflow_helper_custom_limit(temporality: Temporality) {
2851        // Arrange
2852        let cardinality_limit = 2300;
2853        let view_change_cardinality = move |i: &Instrument| {
2854            if i.name == "my_counter" {
2855                Some(
2856                    Stream::builder()
2857                        .with_name("my_counter")
2858                        .with_cardinality_limit(cardinality_limit)
2859                        .build()
2860                        .unwrap(),
2861                )
2862            } else {
2863                None
2864            }
2865        };
2866        let mut test_context = TestContext::new_with_view(temporality, view_change_cardinality);
2867        let counter = test_context.u64_counter("test", "my_counter", None);
2868
2869        // Act
2870        // Record measurements with A:0, A:1,.......A:cardinality_limit, which just fits in the cardinality_limit
2871        for v in 0..cardinality_limit {
2872            counter.add(100, &[KeyValue::new("A", v.to_string())]);
2873        }
2874
2875        // Empty attributes is specially treated and does not count towards the limit.
2876        counter.add(3, &[]);
2877        counter.add(3, &[]);
2878
2879        // All of the below will now go into overflow.
2880        counter.add(100, &[KeyValue::new("A", "foo")]);
2881        counter.add(100, &[KeyValue::new("A", "another")]);
2882        counter.add(100, &[KeyValue::new("A", "yet_another")]);
2883        test_context.flush_metrics();
2884
2885        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2886            unreachable!()
2887        };
2888
2889        // Expecting (cardinality_limit + 1 overflow + Empty attributes) data points.
2890        assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
2891
2892        let data_point =
2893            find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2894        assert_eq!(data_point.value, 300);
2895
2896        // let empty_attrs_data_point = &sum.data_points[0];
2897        let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2898            .expect("Empty attributes point expected");
2899        assert!(
2900            empty_attrs_data_point.attributes.is_empty(),
2901            "Non-empty attribute set"
2902        );
2903        assert_eq!(
2904            empty_attrs_data_point.value, 6,
2905            "Empty attributes value should be 3+3=6"
2906        );
2907
2908        // Phase 2 - for delta temporality, after each collect, data points are cleared
2909        // but for cumulative, they are not cleared.
2910        test_context.reset_metrics();
2911        // The following should be aggregated normally for Delta,
2912        // and should go into overflow for Cumulative.
2913        counter.add(100, &[KeyValue::new("A", "foo")]);
2914        counter.add(100, &[KeyValue::new("A", "another")]);
2915        counter.add(100, &[KeyValue::new("A", "yet_another")]);
2916        test_context.flush_metrics();
2917
2918        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2919            unreachable!()
2920        };
2921
2922        if temporality == Temporality::Delta {
2923            assert_eq!(sum.data_points.len(), 3);
2924
2925            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo")
2926                .expect("point expected");
2927            assert_eq!(data_point.value, 100);
2928
2929            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another")
2930                .expect("point expected");
2931            assert_eq!(data_point.value, 100);
2932
2933            let data_point =
2934                find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another")
2935                    .expect("point expected");
2936            assert_eq!(data_point.value, 100);
2937        } else {
2938            // For cumulative, overflow should still be there, and new points should not be added.
2939            assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
2940            let data_point =
2941                find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2942            assert_eq!(data_point.value, 600);
2943
2944            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo");
2945            assert!(data_point.is_none(), "point should not be present");
2946
2947            let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another");
2948            assert!(data_point.is_none(), "point should not be present");
2949
2950            let data_point =
2951                find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another");
2952            assert!(data_point.is_none(), "point should not be present");
2953        }
2954    }
2955
2956    fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) {
2957        // Arrange
2958        let mut test_context = TestContext::new(temporality);
2959        let counter = test_context.u64_counter("test", "my_counter", None);
2960
2961        // Act
2962        // Add the same set of attributes in different order. (they are expected
2963        // to be treated as same attributes)
2964        // start with sorted order
2965        if start_sorted {
2966            counter.add(
2967                1,
2968                &[
2969                    KeyValue::new("A", "a"),
2970                    KeyValue::new("B", "b"),
2971                    KeyValue::new("C", "c"),
2972                ],
2973            );
2974        } else {
2975            counter.add(
2976                1,
2977                &[
2978                    KeyValue::new("A", "a"),
2979                    KeyValue::new("C", "c"),
2980                    KeyValue::new("B", "b"),
2981                ],
2982            );
2983        }
2984
2985        counter.add(
2986            1,
2987            &[
2988                KeyValue::new("A", "a"),
2989                KeyValue::new("C", "c"),
2990                KeyValue::new("B", "b"),
2991            ],
2992        );
2993        counter.add(
2994            1,
2995            &[
2996                KeyValue::new("B", "b"),
2997                KeyValue::new("A", "a"),
2998                KeyValue::new("C", "c"),
2999            ],
3000        );
3001        counter.add(
3002            1,
3003            &[
3004                KeyValue::new("B", "b"),
3005                KeyValue::new("C", "c"),
3006                KeyValue::new("A", "a"),
3007            ],
3008        );
3009        counter.add(
3010            1,
3011            &[
3012                KeyValue::new("C", "c"),
3013                KeyValue::new("B", "b"),
3014                KeyValue::new("A", "a"),
3015            ],
3016        );
3017        counter.add(
3018            1,
3019            &[
3020                KeyValue::new("C", "c"),
3021                KeyValue::new("A", "a"),
3022                KeyValue::new("B", "b"),
3023            ],
3024        );
3025        test_context.flush_metrics();
3026
3027        let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
3028            unreachable!()
3029        };
3030
3031        // Expecting 1 time-series.
3032        assert_eq!(sum.data_points.len(), 1);
3033
3034        // validate the sole datapoint
3035        let data_point1 = &sum.data_points[0];
3036        assert_eq!(data_point1.value, 6);
3037    }
3038
3039    fn updown_counter_aggregation_helper(temporality: Temporality) {
3040        // Arrange
3041        let mut test_context = TestContext::new(temporality);
3042        let counter = test_context.i64_up_down_counter("test", "my_updown_counter", None);
3043
3044        // Act
3045        counter.add(10, &[KeyValue::new("key1", "value1")]);
3046        counter.add(-1, &[KeyValue::new("key1", "value1")]);
3047        counter.add(-5, &[KeyValue::new("key1", "value1")]);
3048        counter.add(0, &[KeyValue::new("key1", "value1")]);
3049        counter.add(1, &[KeyValue::new("key1", "value1")]);
3050
3051        counter.add(10, &[KeyValue::new("key1", "value2")]);
3052        counter.add(0, &[KeyValue::new("key1", "value2")]);
3053        counter.add(-3, &[KeyValue::new("key1", "value2")]);
3054
3055        test_context.flush_metrics();
3056
3057        // Assert
3058        let MetricData::Sum(sum) = test_context.get_aggregation::<i64>("my_updown_counter", None)
3059        else {
3060            unreachable!()
3061        };
3062        // Expecting 2 time-series.
3063        assert_eq!(sum.data_points.len(), 2);
3064        assert!(
3065            !sum.is_monotonic,
3066            "UpDownCounter should produce non-monotonic."
3067        );
3068        assert_eq!(
3069            sum.temporality,
3070            Temporality::Cumulative,
3071            "Should produce Cumulative for UpDownCounter"
3072        );
3073
3074        // find and validate key1=value2 datapoint
3075        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
3076            .expect("datapoint with key1=value1 expected");
3077        assert_eq!(data_point1.value, 5);
3078
3079        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
3080            .expect("datapoint with key1=value2 expected");
3081        assert_eq!(data_point1.value, 7);
3082
3083        // Reset and report more measurements
3084        test_context.reset_metrics();
3085        counter.add(10, &[KeyValue::new("key1", "value1")]);
3086        counter.add(-1, &[KeyValue::new("key1", "value1")]);
3087        counter.add(-5, &[KeyValue::new("key1", "value1")]);
3088        counter.add(0, &[KeyValue::new("key1", "value1")]);
3089        counter.add(1, &[KeyValue::new("key1", "value1")]);
3090
3091        counter.add(10, &[KeyValue::new("key1", "value2")]);
3092        counter.add(0, &[KeyValue::new("key1", "value2")]);
3093        counter.add(-3, &[KeyValue::new("key1", "value2")]);
3094
3095        test_context.flush_metrics();
3096
3097        let MetricData::Sum(sum) = test_context.get_aggregation::<i64>("my_updown_counter", None)
3098        else {
3099            unreachable!()
3100        };
3101        assert_eq!(sum.data_points.len(), 2);
3102        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
3103            .expect("datapoint with key1=value1 expected");
3104        assert_eq!(data_point1.value, 10);
3105
3106        let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
3107            .expect("datapoint with key1=value2 expected");
3108        assert_eq!(data_point1.value, 14);
3109    }
3110
3111    fn find_sum_datapoint_with_key_value<'a, T>(
3112        data_points: &'a [SumDataPoint<T>],
3113        key: &str,
3114        value: &str,
3115    ) -> Option<&'a SumDataPoint<T>> {
3116        data_points.iter().find(|&datapoint| {
3117            datapoint
3118                .attributes
3119                .iter()
3120                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3121        })
3122    }
3123
3124    fn find_overflow_sum_datapoint<T>(data_points: &[SumDataPoint<T>]) -> Option<&SumDataPoint<T>> {
3125        data_points.iter().find(|&datapoint| {
3126            datapoint.attributes.iter().any(|kv| {
3127                kv.key.as_str() == "otel.metric.overflow" && kv.value == Value::Bool(true)
3128            })
3129        })
3130    }
3131
3132    fn find_gauge_datapoint_with_key_value<'a, T>(
3133        data_points: &'a [GaugeDataPoint<T>],
3134        key: &str,
3135        value: &str,
3136    ) -> Option<&'a GaugeDataPoint<T>> {
3137        data_points.iter().find(|&datapoint| {
3138            datapoint
3139                .attributes
3140                .iter()
3141                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3142        })
3143    }
3144
3145    fn find_sum_datapoint_with_no_attributes<T>(
3146        data_points: &[SumDataPoint<T>],
3147    ) -> Option<&SumDataPoint<T>> {
3148        data_points
3149            .iter()
3150            .find(|&datapoint| datapoint.attributes.is_empty())
3151    }
3152
3153    fn find_gauge_datapoint_with_no_attributes<T>(
3154        data_points: &[GaugeDataPoint<T>],
3155    ) -> Option<&GaugeDataPoint<T>> {
3156        data_points
3157            .iter()
3158            .find(|&datapoint| datapoint.attributes.is_empty())
3159    }
3160
3161    fn find_histogram_datapoint_with_key_value<'a, T>(
3162        data_points: &'a [HistogramDataPoint<T>],
3163        key: &str,
3164        value: &str,
3165    ) -> Option<&'a HistogramDataPoint<T>> {
3166        data_points.iter().find(|&datapoint| {
3167            datapoint
3168                .attributes
3169                .iter()
3170                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3171        })
3172    }
3173
3174    fn find_histogram_datapoint_with_no_attributes<T>(
3175        data_points: &[HistogramDataPoint<T>],
3176    ) -> Option<&HistogramDataPoint<T>> {
3177        data_points
3178            .iter()
3179            .find(|&datapoint| datapoint.attributes.is_empty())
3180    }
3181
3182    fn find_scope_metric<'a>(
3183        metrics: &'a [ScopeMetrics],
3184        name: &'a str,
3185    ) -> Option<&'a ScopeMetrics> {
3186        metrics
3187            .iter()
3188            .find(|&scope_metric| scope_metric.scope.name() == name)
3189    }
3190
3191    struct TestContext {
3192        exporter: InMemoryMetricExporter,
3193        meter_provider: SdkMeterProvider,
3194
3195        // Saving this on the test context for lifetime simplicity
3196        resource_metrics: Vec<ResourceMetrics>,
3197    }
3198
3199    impl TestContext {
3200        fn new(temporality: Temporality) -> Self {
3201            let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality);
3202            let exporter = exporter.build();
3203            let meter_provider = SdkMeterProvider::builder()
3204                .with_periodic_exporter(exporter.clone())
3205                .build();
3206
3207            TestContext {
3208                exporter,
3209                meter_provider,
3210                resource_metrics: vec![],
3211            }
3212        }
3213
3214        fn new_with_view<T>(temporality: Temporality, view: T) -> Self
3215        where
3216            T: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
3217        {
3218            let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality);
3219            let exporter = exporter.build();
3220            let meter_provider = SdkMeterProvider::builder()
3221                .with_periodic_exporter(exporter.clone())
3222                .with_view(view)
3223                .build();
3224
3225            TestContext {
3226                exporter,
3227                meter_provider,
3228                resource_metrics: vec![],
3229            }
3230        }
3231
3232        fn u64_counter(
3233            &self,
3234            meter_name: &'static str,
3235            counter_name: &'static str,
3236            unit: Option<&'static str>,
3237        ) -> Counter<u64> {
3238            let meter = self.meter_provider.meter(meter_name);
3239            let mut counter_builder = meter.u64_counter(counter_name);
3240            if let Some(unit_name) = unit {
3241                counter_builder = counter_builder.with_unit(unit_name);
3242            }
3243            counter_builder.build()
3244        }
3245
3246        fn i64_up_down_counter(
3247            &self,
3248            meter_name: &'static str,
3249            counter_name: &'static str,
3250            unit: Option<&'static str>,
3251        ) -> UpDownCounter<i64> {
3252            let meter = self.meter_provider.meter(meter_name);
3253            let mut updown_counter_builder = meter.i64_up_down_counter(counter_name);
3254            if let Some(unit_name) = unit {
3255                updown_counter_builder = updown_counter_builder.with_unit(unit_name);
3256            }
3257            updown_counter_builder.build()
3258        }
3259
3260        fn meter(&self) -> Meter {
3261            self.meter_provider.meter("test")
3262        }
3263
3264        fn flush_metrics(&self) {
3265            self.meter_provider.force_flush().unwrap();
3266        }
3267
3268        fn reset_metrics(&self) {
3269            self.exporter.reset();
3270        }
3271
3272        fn check_no_metrics(&self) {
3273            let resource_metrics = self
3274                .exporter
3275                .get_finished_metrics()
3276                .expect("metrics expected to be exported"); // TODO: Need to fix InMemoryMetricExporter to return None.
3277
3278            assert!(resource_metrics.is_empty(), "no metrics should be exported");
3279        }
3280
3281        fn get_aggregation<T: Number>(
3282            &mut self,
3283            counter_name: &str,
3284            unit_name: Option<&str>,
3285        ) -> &MetricData<T> {
3286            self.resource_metrics = self
3287                .exporter
3288                .get_finished_metrics()
3289                .expect("metrics expected to be exported");
3290
3291            assert!(
3292                !self.resource_metrics.is_empty(),
3293                "no metrics were exported"
3294            );
3295
3296            assert!(
3297                self.resource_metrics.len() == 1,
3298                "Expected single resource metrics."
3299            );
3300            let resource_metric = self
3301                .resource_metrics
3302                .first()
3303                .expect("This should contain exactly one resource metric, as validated above.");
3304
3305            assert!(
3306                !resource_metric.scope_metrics.is_empty(),
3307                "No scope metrics in latest export"
3308            );
3309            assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
3310
3311            let metric = &resource_metric.scope_metrics[0].metrics[0];
3312            assert_eq!(metric.name, counter_name);
3313            if let Some(expected_unit) = unit_name {
3314                assert_eq!(metric.unit, expected_unit);
3315            }
3316
3317            T::extract_metrics_data_ref(&metric.data)
3318                .expect("Failed to cast aggregation to expected type")
3319        }
3320
3321        fn get_from_multiple_aggregations<T: Number>(
3322            &mut self,
3323            counter_name: &str,
3324            unit_name: Option<&str>,
3325            invocation_count: usize,
3326        ) -> Vec<&MetricData<T>> {
3327            self.resource_metrics = self
3328                .exporter
3329                .get_finished_metrics()
3330                .expect("metrics expected to be exported");
3331
3332            assert!(
3333                !self.resource_metrics.is_empty(),
3334                "no metrics were exported"
3335            );
3336
3337            assert_eq!(
3338                self.resource_metrics.len(),
3339                invocation_count,
3340                "Expected collect to be called {} times",
3341                invocation_count
3342            );
3343
3344            let result = self
3345                .resource_metrics
3346                .iter()
3347                .map(|resource_metric| {
3348                    assert!(
3349                        !resource_metric.scope_metrics.is_empty(),
3350                        "An export with no scope metrics occurred"
3351                    );
3352
3353                    assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
3354
3355                    let metric = &resource_metric.scope_metrics[0].metrics[0];
3356                    assert_eq!(metric.name, counter_name);
3357
3358                    if let Some(expected_unit) = unit_name {
3359                        assert_eq!(metric.unit, expected_unit);
3360                    }
3361
3362                    let aggregation = T::extract_metrics_data_ref(&metric.data)
3363                        .expect("Failed to cast aggregation to expected type");
3364                    aggregation
3365                })
3366                .collect::<Vec<_>>();
3367
3368            result
3369        }
3370    }
3371}