1#[allow(unreachable_pub)]
43#[allow(unused)]
44pub(crate) mod aggregation;
45pub mod data;
46mod error;
47pub mod exporter;
48pub(crate) mod instrument;
49pub(crate) mod internal;
50#[cfg(feature = "experimental_metrics_custom_reader")]
51pub(crate) mod manual_reader;
52pub(crate) mod meter;
53mod meter_provider;
54pub(crate) mod noop;
55pub(crate) mod periodic_reader;
56#[cfg(feature = "experimental_metrics_periodicreader_with_async_runtime")]
57pub mod periodic_reader_with_async_runtime;
59pub(crate) mod pipeline;
60#[cfg(feature = "experimental_metrics_custom_reader")]
61pub mod reader;
62#[cfg(not(feature = "experimental_metrics_custom_reader"))]
63pub(crate) mod reader;
64pub(crate) mod view;
65
66#[cfg(any(feature = "testing", test))]
68#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
69pub mod in_memory_exporter;
70#[cfg(any(feature = "testing", test))]
71#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
72pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};
73
74#[cfg(feature = "spec_unstable_metrics_views")]
75pub use aggregation::*;
76#[cfg(feature = "experimental_metrics_custom_reader")]
77pub use manual_reader::*;
78pub use meter_provider::*;
79pub use periodic_reader::*;
80#[cfg(feature = "experimental_metrics_custom_reader")]
81pub use pipeline::Pipeline;
82
83pub use instrument::{Instrument, InstrumentKind, Stream, StreamBuilder};
84
85use std::hash::Hash;
86
87#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Hash)]
89#[non_exhaustive]
90pub enum Temporality {
91 #[default]
96 Cumulative,
97
98 Delta,
103
104 LowMemory,
108}
109
110#[cfg(all(test, feature = "testing"))]
111mod tests {
112 use self::data::{HistogramDataPoint, ScopeMetrics, SumDataPoint};
113 use super::data::MetricData;
114 use super::internal::Number;
115 use super::*;
116 use crate::metrics::data::ResourceMetrics;
117 use crate::metrics::internal::AggregatedMetricsAccess;
118 use crate::metrics::InMemoryMetricExporter;
119 use crate::metrics::InMemoryMetricExporterBuilder;
120 use data::GaugeDataPoint;
121 use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
122 use opentelemetry::InstrumentationScope;
123 use opentelemetry::Value;
124 use opentelemetry::{metrics::MeterProvider as _, KeyValue};
125 use rand::{rngs, Rng, SeedableRng};
126 use std::cmp::{max, min};
127 use std::sync::atomic::{AtomicBool, Ordering};
128 use std::sync::{Arc, Mutex};
129 use std::thread;
130 use std::time::Duration;
131
132 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
139 #[cfg(not(feature = "experimental_metrics_disable_name_validation"))]
140 async fn invalid_instrument_config_noops() {
141 let invalid_instrument_names = vec![
144 "_startWithNoneAlphabet",
145 "utf8char锈",
146 "a".repeat(256).leak(),
147 "invalid name",
148 ];
149 for name in invalid_instrument_names {
150 let test_context = TestContext::new(Temporality::Cumulative);
151 let counter = test_context.meter().u64_counter(name).build();
152 counter.add(1, &[]);
153
154 let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
155 up_down_counter.add(1, &[]);
156
157 let gauge = test_context.meter().f64_gauge(name).build();
158 gauge.record(1.9, &[]);
159
160 let histogram = test_context.meter().f64_histogram(name).build();
161 histogram.record(1.0, &[]);
162
163 let _observable_counter = test_context
164 .meter()
165 .u64_observable_counter(name)
166 .with_callback(move |observer| {
167 observer.observe(1, &[]);
168 })
169 .build();
170
171 let _observable_gauge = test_context
172 .meter()
173 .f64_observable_gauge(name)
174 .with_callback(move |observer| {
175 observer.observe(1.0, &[]);
176 })
177 .build();
178
179 let _observable_up_down_counter = test_context
180 .meter()
181 .i64_observable_up_down_counter(name)
182 .with_callback(move |observer| {
183 observer.observe(1, &[]);
184 })
185 .build();
186
187 test_context.flush_metrics();
188
189 test_context.check_no_metrics();
191 }
192
193 let invalid_bucket_boundaries = vec![
194 vec![1.0, 1.0], vec![1.0, 2.0, 3.0, 2.0], vec![1.0, 2.0, 3.0, 4.0, 2.5], vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], vec![1.0, 2.0, 3.0, f64::NAN], vec![f64::NEG_INFINITY, 2.0, 3.0], ];
201 for bucket_boundaries in invalid_bucket_boundaries {
202 let test_context = TestContext::new(Temporality::Cumulative);
203 let histogram = test_context
204 .meter()
205 .f64_histogram("test")
206 .with_boundaries(bucket_boundaries)
207 .build();
208 histogram.record(1.9, &[]);
209 test_context.flush_metrics();
210
211 test_context.check_no_metrics();
214 }
215 }
216
217 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
218 #[cfg(feature = "experimental_metrics_disable_name_validation")]
219 async fn valid_instrument_config_with_feature_experimental_metrics_disable_name_validation() {
220 let invalid_instrument_names = vec![
223 "_startWithNoneAlphabet",
224 "utf8char锈",
225 "",
226 "a".repeat(256).leak(),
227 "\\allow\\slash /sec",
228 "\\allow\\$$slash /sec",
229 "Total $ Count",
230 "\\test\\UsagePercent(Total) > 80%",
231 "invalid name",
232 ];
233 for name in invalid_instrument_names {
234 let test_context = TestContext::new(Temporality::Cumulative);
235 let counter = test_context.meter().u64_counter(name).build();
236 counter.add(1, &[]);
237
238 let up_down_counter = test_context.meter().i64_up_down_counter(name).build();
239 up_down_counter.add(1, &[]);
240
241 let gauge = test_context.meter().f64_gauge(name).build();
242 gauge.record(1.9, &[]);
243
244 let histogram = test_context.meter().f64_histogram(name).build();
245 histogram.record(1.0, &[]);
246
247 let _observable_counter = test_context
248 .meter()
249 .u64_observable_counter(name)
250 .with_callback(move |observer| {
251 observer.observe(1, &[]);
252 })
253 .build();
254
255 let _observable_gauge = test_context
256 .meter()
257 .f64_observable_gauge(name)
258 .with_callback(move |observer| {
259 observer.observe(1.0, &[]);
260 })
261 .build();
262
263 let _observable_up_down_counter = test_context
264 .meter()
265 .i64_observable_up_down_counter(name)
266 .with_callback(move |observer| {
267 observer.observe(1, &[]);
268 })
269 .build();
270
271 test_context.flush_metrics();
272
273 let resource_metrics = test_context
275 .exporter
276 .get_finished_metrics()
277 .expect("metrics expected to be exported");
278
279 assert!(!resource_metrics.is_empty(), "metrics should be exported");
280 }
281
282 let invalid_bucket_boundaries = vec![
285 vec![1.0, 1.0], vec![1.0, 2.0, 3.0, 2.0], vec![1.0, 2.0, 3.0, 4.0, 2.5], vec![1.0, 2.0, 3.0, f64::INFINITY, 4.0], vec![1.0, 2.0, 3.0, f64::NAN], vec![f64::NEG_INFINITY, 2.0, 3.0], ];
292 for bucket_boundaries in invalid_bucket_boundaries {
293 let test_context = TestContext::new(Temporality::Cumulative);
294 let histogram = test_context
295 .meter()
296 .f64_histogram("test")
297 .with_boundaries(bucket_boundaries)
298 .build();
299 histogram.record(1.9, &[]);
300 test_context.flush_metrics();
301
302 test_context.check_no_metrics();
305 }
306 }
307
308 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
309 async fn counter_aggregation_delta() {
310 counter_aggregation_helper(Temporality::Delta);
313 }
314
315 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
316 async fn counter_aggregation_cumulative() {
317 counter_aggregation_helper(Temporality::Cumulative);
320 }
321
322 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
323 async fn counter_aggregation_no_attributes_cumulative() {
324 let mut test_context = TestContext::new(Temporality::Cumulative);
325 let counter = test_context.u64_counter("test", "my_counter", None);
326
327 counter.add(50, &[]);
328 test_context.flush_metrics();
329
330 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
331 unreachable!()
332 };
333
334 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
335 assert!(sum.is_monotonic, "Should produce monotonic.");
336 assert_eq!(
337 sum.temporality,
338 Temporality::Cumulative,
339 "Should produce cumulative"
340 );
341
342 let data_point = &sum.data_points[0];
343 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
344 assert_eq!(data_point.value, 50, "Unexpected data point value");
345 }
346
347 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
348 async fn counter_aggregation_no_attributes_delta() {
349 let mut test_context = TestContext::new(Temporality::Delta);
350 let counter = test_context.u64_counter("test", "my_counter", None);
351
352 counter.add(50, &[]);
353 test_context.flush_metrics();
354
355 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
356 unreachable!()
357 };
358
359 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
360 assert!(sum.is_monotonic, "Should produce monotonic.");
361 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
362
363 let data_point = &sum.data_points[0];
364 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
365 assert_eq!(data_point.value, 50, "Unexpected data point value");
366 }
367
368 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
369 async fn counter_aggregation_overflow_delta() {
370 counter_aggregation_overflow_helper(Temporality::Delta);
371 counter_aggregation_overflow_helper_custom_limit(Temporality::Delta);
372 }
373
374 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
375 async fn counter_aggregation_overflow_cumulative() {
376 counter_aggregation_overflow_helper(Temporality::Cumulative);
377 counter_aggregation_overflow_helper_custom_limit(Temporality::Cumulative);
378 }
379
380 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
381 async fn counter_aggregation_attribute_order_sorted_first_delta() {
382 counter_aggregation_attribute_order_helper(Temporality::Delta, true);
385 }
386
387 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
388 async fn counter_aggregation_attribute_order_sorted_first_cumulative() {
389 counter_aggregation_attribute_order_helper(Temporality::Cumulative, true);
392 }
393
394 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
395 async fn counter_aggregation_attribute_order_unsorted_first_delta() {
396 counter_aggregation_attribute_order_helper(Temporality::Delta, false);
400 }
401
402 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
403 async fn counter_aggregation_attribute_order_unsorted_first_cumulative() {
404 counter_aggregation_attribute_order_helper(Temporality::Cumulative, false);
408 }
409
410 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
411 async fn histogram_aggregation_cumulative() {
412 histogram_aggregation_helper(Temporality::Cumulative);
415 }
416
417 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
418 async fn histogram_aggregation_delta() {
419 histogram_aggregation_helper(Temporality::Delta);
422 }
423
424 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
425 async fn histogram_aggregation_with_custom_bounds() {
426 histogram_aggregation_with_custom_bounds_helper(Temporality::Delta);
429 histogram_aggregation_with_custom_bounds_helper(Temporality::Cumulative);
430 }
431
432 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
433 async fn histogram_aggregation_with_empty_bounds() {
434 histogram_aggregation_with_empty_bounds_helper(Temporality::Delta);
437 histogram_aggregation_with_empty_bounds_helper(Temporality::Cumulative);
438 }
439
440 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
441 async fn updown_counter_aggregation_cumulative() {
442 updown_counter_aggregation_helper(Temporality::Cumulative);
445 }
446
447 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
448 async fn updown_counter_aggregation_delta() {
449 updown_counter_aggregation_helper(Temporality::Delta);
452 }
453
454 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
455 async fn gauge_aggregation() {
456 gauge_aggregation_helper(Temporality::Delta);
461 gauge_aggregation_helper(Temporality::Cumulative);
462 }
463
464 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
465 async fn observable_gauge_aggregation() {
466 observable_gauge_aggregation_helper(Temporality::Delta, false);
471 observable_gauge_aggregation_helper(Temporality::Delta, true);
472 observable_gauge_aggregation_helper(Temporality::Cumulative, false);
473 observable_gauge_aggregation_helper(Temporality::Cumulative, true);
474 }
475
476 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
477 async fn observable_counter_aggregation_cumulative_non_zero_increment() {
478 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false);
481 }
482
483 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
484 async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() {
485 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true);
488 }
489
490 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
491 async fn observable_counter_aggregation_delta_non_zero_increment() {
492 observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false);
495 }
496
497 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
498 async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() {
499 observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true);
502 }
503
504 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
505 async fn observable_counter_aggregation_cumulative_zero_increment() {
506 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false);
509 }
510
511 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
512 async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() {
513 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true);
516 }
517
518 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
519 async fn observable_counter_aggregation_delta_zero_increment() {
520 observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false);
523 }
524
525 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
526 async fn observable_counter_aggregation_delta_zero_increment_no_attrs() {
527 observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true);
530 }
531
532 fn observable_counter_aggregation_helper(
533 temporality: Temporality,
534 start: u64,
535 increment: u64,
536 length: u64,
537 is_empty_attributes: bool,
538 ) {
539 let mut test_context = TestContext::new(temporality);
541 let attributes = if is_empty_attributes {
542 vec![]
543 } else {
544 vec![KeyValue::new("key1", "value1")]
545 };
546 let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
548 println!("Testing with observable values: {:?}", values);
549 let values = Arc::new(values);
550 let values_clone = values.clone();
551 let i = Arc::new(Mutex::new(0));
552 let _observable_counter = test_context
553 .meter()
554 .u64_observable_counter("my_observable_counter")
555 .with_unit("my_unit")
556 .with_callback(move |observer| {
557 let mut index = i.lock().unwrap();
558 if *index < values.len() {
559 observer.observe(values[*index], &attributes);
560 *index += 1;
561 }
562 })
563 .build();
564
565 for (iter, v) in values_clone.iter().enumerate() {
566 test_context.flush_metrics();
567 let MetricData::Sum(sum) =
568 test_context.get_aggregation::<u64>("my_observable_counter", None)
569 else {
570 unreachable!()
571 };
572 assert_eq!(sum.data_points.len(), 1);
573 assert!(sum.is_monotonic, "Counter should produce monotonic.");
574 if let Temporality::Cumulative = temporality {
575 assert_eq!(
576 sum.temporality,
577 Temporality::Cumulative,
578 "Should produce cumulative"
579 );
580 } else {
581 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
582 }
583
584 let data_point = if is_empty_attributes {
586 &sum.data_points[0]
587 } else {
588 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
589 .expect("datapoint with key1=value1 expected")
590 };
591
592 if let Temporality::Cumulative = temporality {
593 assert_eq!(data_point.value, *v);
595 } else {
596 if iter == 0 {
599 assert_eq!(data_point.value, start);
600 } else {
601 assert_eq!(data_point.value, increment);
602 }
603 }
604
605 test_context.reset_metrics();
606 }
607 }
608
609 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
610 async fn empty_meter_name_retained() {
611 async fn meter_name_retained_helper(
612 meter: Meter,
613 provider: SdkMeterProvider,
614 exporter: InMemoryMetricExporter,
615 ) {
616 let counter = meter.u64_counter("my_counter").build();
618
619 counter.add(10, &[]);
620 provider.force_flush().unwrap();
621
622 let resource_metrics = exporter
624 .get_finished_metrics()
625 .expect("metrics are expected to be exported.");
626 assert!(
627 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
628 "There should be a single metric"
629 );
630 let meter_name = resource_metrics[0].scope_metrics[0].scope.name();
631 assert_eq!(meter_name, "");
632 }
633
634 let exporter = InMemoryMetricExporter::default();
635 let meter_provider = SdkMeterProvider::builder()
636 .with_periodic_exporter(exporter.clone())
637 .build();
638
639 let meter1 = meter_provider.meter("");
641 meter_name_retained_helper(meter1, meter_provider.clone(), exporter.clone()).await;
642
643 let meter_scope = InstrumentationScope::builder("").build();
644 let meter2 = meter_provider.meter_with_scope(meter_scope);
645 meter_name_retained_helper(meter2, meter_provider, exporter).await;
646 }
647
648 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
649 async fn counter_duplicate_instrument_merge() {
650 let exporter = InMemoryMetricExporter::default();
652 let meter_provider = SdkMeterProvider::builder()
653 .with_periodic_exporter(exporter.clone())
654 .build();
655
656 let meter = meter_provider.meter("test");
658 let counter = meter
659 .u64_counter("my_counter")
660 .with_unit("my_unit")
661 .with_description("my_description")
662 .build();
663
664 let counter_duplicated = meter
665 .u64_counter("my_counter")
666 .with_unit("my_unit")
667 .with_description("my_description")
668 .build();
669
670 let attribute = vec![KeyValue::new("key1", "value1")];
671 counter.add(10, &attribute);
672 counter_duplicated.add(5, &attribute);
673
674 meter_provider.force_flush().unwrap();
675
676 let resource_metrics = exporter
678 .get_finished_metrics()
679 .expect("metrics are expected to be exported.");
680 assert!(
681 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
682 "There should be single metric merging duplicate instruments"
683 );
684 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
685 assert_eq!(metric.name, "my_counter");
686 assert_eq!(metric.unit, "my_unit");
687 let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
688 .expect("Sum aggregation expected for Counter instruments by default")
689 else {
690 unreachable!()
691 };
692
693 assert_eq!(sum.data_points.len(), 1);
695
696 let datapoint = &sum.data_points[0];
697 assert_eq!(datapoint.value, 15);
698 }
699
700 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
701 async fn counter_duplicate_instrument_different_meter_no_merge() {
702 let exporter = InMemoryMetricExporter::default();
704 let meter_provider = SdkMeterProvider::builder()
705 .with_periodic_exporter(exporter.clone())
706 .build();
707
708 let meter1 = meter_provider.meter("test.meter1");
710 let meter2 = meter_provider.meter("test.meter2");
711 let counter1 = meter1
712 .u64_counter("my_counter")
713 .with_unit("my_unit")
714 .with_description("my_description")
715 .build();
716
717 let counter2 = meter2
718 .u64_counter("my_counter")
719 .with_unit("my_unit")
720 .with_description("my_description")
721 .build();
722
723 let attribute = vec![KeyValue::new("key1", "value1")];
724 counter1.add(10, &attribute);
725 counter2.add(5, &attribute);
726
727 meter_provider.force_flush().unwrap();
728
729 let resource_metrics = exporter
731 .get_finished_metrics()
732 .expect("metrics are expected to be exported.");
733 assert!(
734 resource_metrics[0].scope_metrics.len() == 2,
735 "There should be 2 separate scope"
736 );
737 assert!(
738 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
739 "There should be single metric for the scope"
740 );
741 assert!(
742 resource_metrics[0].scope_metrics[1].metrics.len() == 1,
743 "There should be single metric for the scope"
744 );
745
746 let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1");
747 let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2");
748
749 if let Some(scope1) = scope1 {
750 let metric1 = &scope1.metrics[0];
751 assert_eq!(metric1.name, "my_counter");
752 assert_eq!(metric1.unit, "my_unit");
753 assert_eq!(metric1.description, "my_description");
754 let MetricData::Sum(sum1) = u64::extract_metrics_data_ref(&metric1.data)
755 .expect("Sum aggregation expected for Counter instruments by default")
756 else {
757 unreachable!()
758 };
759
760 assert_eq!(sum1.data_points.len(), 1);
762
763 let datapoint1 = &sum1.data_points[0];
764 assert_eq!(datapoint1.value, 10);
765 } else {
766 panic!("No MetricScope found for 'test.meter1'");
767 }
768
769 if let Some(scope2) = scope2 {
770 let metric2 = &scope2.metrics[0];
771 assert_eq!(metric2.name, "my_counter");
772 assert_eq!(metric2.unit, "my_unit");
773 assert_eq!(metric2.description, "my_description");
774
775 let MetricData::Sum(sum2) = u64::extract_metrics_data_ref(&metric2.data)
776 .expect("Sum aggregation expected for Counter instruments by default")
777 else {
778 unreachable!()
779 };
780
781 assert_eq!(sum2.data_points.len(), 1);
783
784 let datapoint2 = &sum2.data_points[0];
785 assert_eq!(datapoint2.value, 5);
786 } else {
787 panic!("No MetricScope found for 'test.meter2'");
788 }
789 }
790
791 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
792 async fn instrumentation_scope_identity_test() {
793 let exporter = InMemoryMetricExporter::default();
795 let meter_provider = SdkMeterProvider::builder()
796 .with_periodic_exporter(exporter.clone())
797 .build();
798
799 let make_scope = |attributes| {
803 InstrumentationScope::builder("test.meter")
804 .with_version("v0.1.0")
805 .with_schema_url("http://example.com")
806 .with_attributes(attributes)
807 .build()
808 };
809
810 let meter1 =
811 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
812 let meter2 =
813 meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
814
815 let counter1 = meter1
816 .u64_counter("my_counter")
817 .with_unit("my_unit")
818 .with_description("my_description")
819 .build();
820
821 let counter2 = meter2
822 .u64_counter("my_counter")
823 .with_unit("my_unit")
824 .with_description("my_description")
825 .build();
826
827 let attribute = vec![KeyValue::new("key1", "value1")];
828 counter1.add(10, &attribute);
829 counter2.add(5, &attribute);
830
831 meter_provider.force_flush().unwrap();
832
833 let resource_metrics = exporter
835 .get_finished_metrics()
836 .expect("metrics are expected to be exported.");
837 println!("resource_metrics: {:?}", resource_metrics);
838 assert!(
839 resource_metrics[0].scope_metrics.len() == 1,
840 "There should be a single scope as the meters are identical"
841 );
842 assert!(
843 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
844 "There should be single metric for the scope as instruments are identical"
845 );
846
847 let scope = &resource_metrics[0].scope_metrics[0].scope;
848 assert_eq!(scope.name(), "test.meter");
849 assert_eq!(scope.version(), Some("v0.1.0"));
850 assert_eq!(scope.schema_url(), Some("http://example.com"));
851
852 assert!(scope.attributes().eq(&[KeyValue::new("key", "value1")]));
855
856 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
857 assert_eq!(metric.name, "my_counter");
858 assert_eq!(metric.unit, "my_unit");
859 assert_eq!(metric.description, "my_description");
860
861 let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
862 .expect("Sum aggregation expected for Counter instruments by default")
863 else {
864 unreachable!()
865 };
866
867 assert_eq!(sum.data_points.len(), 1);
869
870 let datapoint = &sum.data_points[0];
871 assert_eq!(datapoint.value, 15);
872 }
873
874 #[cfg(feature = "spec_unstable_metrics_views")]
875 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
876 async fn histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist() {
877 let exporter = InMemoryMetricExporter::default();
882 let view = |i: &Instrument| {
883 if i.name == "test_histogram" {
884 Stream::builder()
885 .with_aggregation(aggregation::Aggregation::ExplicitBucketHistogram {
886 boundaries: vec![0.9, 1.9, 1.2, 1.3, 1.4, 1.5], record_min_max: false,
888 })
889 .with_name("test_histogram_renamed")
890 .with_unit("test_unit_renamed")
891 .build()
892 .ok()
893 } else {
894 None
895 }
896 };
897 let meter_provider = SdkMeterProvider::builder()
898 .with_periodic_exporter(exporter.clone())
899 .with_view(view)
900 .build();
901
902 let meter = meter_provider.meter("test");
904 let histogram = meter
905 .f64_histogram("test_histogram")
906 .with_unit("test_unit")
907 .build();
908
909 histogram.record(1.5, &[KeyValue::new("key1", "value1")]);
910 meter_provider.force_flush().unwrap();
911
912 let resource_metrics = exporter
914 .get_finished_metrics()
915 .expect("metrics are expected to be exported.");
916 assert!(!resource_metrics.is_empty());
917 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
918 assert_eq!(
919 metric.name, "test_histogram",
920 "View rename should be ignored and original name retained."
921 );
922 assert_eq!(
923 metric.unit, "test_unit",
924 "View rename of unit should be ignored and original unit retained."
925 );
926 }
927
928 #[cfg(feature = "spec_unstable_metrics_views")]
929 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
930 #[ignore = "Spatial aggregation is not yet implemented."]
931 async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
932 let exporter = InMemoryMetricExporter::default();
936 let view = |i: &Instrument| {
938 if i.name == "my_observable_counter" {
939 Stream::builder()
940 .with_allowed_attribute_keys(vec![])
941 .build()
942 .ok()
943 } else {
944 None
945 }
946 };
947 let meter_provider = SdkMeterProvider::builder()
948 .with_periodic_exporter(exporter.clone())
949 .with_view(view)
950 .build();
951
952 let meter = meter_provider.meter("test");
954 let _observable_counter = meter
955 .u64_observable_counter("my_observable_counter")
956 .with_callback(|observer| {
957 observer.observe(
958 100,
959 &[
960 KeyValue::new("statusCode", "200"),
961 KeyValue::new("verb", "get"),
962 ],
963 );
964
965 observer.observe(
966 100,
967 &[
968 KeyValue::new("statusCode", "200"),
969 KeyValue::new("verb", "post"),
970 ],
971 );
972
973 observer.observe(
974 100,
975 &[
976 KeyValue::new("statusCode", "500"),
977 KeyValue::new("verb", "get"),
978 ],
979 );
980 })
981 .build();
982
983 meter_provider.force_flush().unwrap();
984
985 let resource_metrics = exporter
987 .get_finished_metrics()
988 .expect("metrics are expected to be exported.");
989 assert!(!resource_metrics.is_empty());
990 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
991 assert_eq!(metric.name, "my_observable_counter",);
992
993 let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
994 .expect("Sum aggregation expected for ObservableCounter instruments by default")
995 else {
996 unreachable!()
997 };
998
999 assert_eq!(sum.data_points.len(), 1);
1003
1004 let data_point = &sum.data_points[0];
1006 assert_eq!(data_point.value, 300);
1007 }
1008
1009 #[cfg(feature = "spec_unstable_metrics_views")]
1010 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1011 async fn spatial_aggregation_when_view_drops_attributes_counter() {
1012 let exporter = InMemoryMetricExporter::default();
1016 let view = |i: &Instrument| {
1018 if i.name == "my_counter" {
1019 Some(
1020 Stream::builder()
1021 .with_allowed_attribute_keys(vec![])
1022 .build()
1023 .unwrap(),
1024 )
1025 } else {
1026 None
1027 }
1028 };
1029 let meter_provider = SdkMeterProvider::builder()
1030 .with_periodic_exporter(exporter.clone())
1031 .with_view(view)
1032 .build();
1033
1034 let meter = meter_provider.meter("test");
1036 let counter = meter.u64_counter("my_counter").build();
1037
1038 counter.add(
1041 10,
1042 [
1043 KeyValue::new("statusCode", "200"),
1044 KeyValue::new("verb", "Get"),
1045 ]
1046 .as_ref(),
1047 );
1048
1049 counter.add(
1050 10,
1051 [
1052 KeyValue::new("statusCode", "500"),
1053 KeyValue::new("verb", "Get"),
1054 ]
1055 .as_ref(),
1056 );
1057
1058 counter.add(
1059 10,
1060 [
1061 KeyValue::new("statusCode", "200"),
1062 KeyValue::new("verb", "Post"),
1063 ]
1064 .as_ref(),
1065 );
1066
1067 meter_provider.force_flush().unwrap();
1068
1069 let resource_metrics = exporter
1071 .get_finished_metrics()
1072 .expect("metrics are expected to be exported.");
1073 assert!(!resource_metrics.is_empty());
1074 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1075 assert_eq!(metric.name, "my_counter",);
1076
1077 let MetricData::Sum(sum) = u64::extract_metrics_data_ref(&metric.data)
1078 .expect("Sum aggregation expected for Counter instruments by default")
1079 else {
1080 unreachable!()
1081 };
1082
1083 assert_eq!(sum.data_points.len(), 1);
1087 let data_point = &sum.data_points[0];
1089 assert_eq!(data_point.value, 30);
1090 }
1091
1092 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1093 async fn no_attr_cumulative_up_down_counter() {
1094 let mut test_context = TestContext::new(Temporality::Cumulative);
1095 let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1096
1097 counter.add(50, &[]);
1098 test_context.flush_metrics();
1099
1100 let MetricData::Sum(sum) =
1101 test_context.get_aggregation::<i64>("my_counter", Some("my_unit"))
1102 else {
1103 unreachable!()
1104 };
1105
1106 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1107 assert!(!sum.is_monotonic, "Should not produce monotonic.");
1108 assert_eq!(
1109 sum.temporality,
1110 Temporality::Cumulative,
1111 "Should produce cumulative"
1112 );
1113
1114 let data_point = &sum.data_points[0];
1115 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1116 assert_eq!(data_point.value, 50, "Unexpected data point value");
1117 }
1118
1119 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1120 async fn no_attr_up_down_counter_always_cumulative() {
1121 let mut test_context = TestContext::new(Temporality::Delta);
1122 let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
1123
1124 counter.add(50, &[]);
1125 test_context.flush_metrics();
1126
1127 let MetricData::Sum(sum) =
1128 test_context.get_aggregation::<i64>("my_counter", Some("my_unit"))
1129 else {
1130 unreachable!()
1131 };
1132
1133 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1134 assert!(!sum.is_monotonic, "Should not produce monotonic.");
1135 assert_eq!(
1136 sum.temporality,
1137 Temporality::Cumulative,
1138 "Should produce Cumulative due to UpDownCounter temporality_preference"
1139 );
1140
1141 let data_point = &sum.data_points[0];
1142 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1143 assert_eq!(data_point.value, 50, "Unexpected data point value");
1144 }
1145
1146 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1147 async fn no_attr_cumulative_counter_value_added_after_export() {
1148 let mut test_context = TestContext::new(Temporality::Cumulative);
1149 let counter = test_context.u64_counter("test", "my_counter", None);
1150
1151 counter.add(50, &[]);
1152 test_context.flush_metrics();
1153 let _ = test_context.get_aggregation::<u64>("my_counter", None);
1154 test_context.reset_metrics();
1155
1156 counter.add(5, &[]);
1157 test_context.flush_metrics();
1158 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1159 unreachable!()
1160 };
1161
1162 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1163 assert!(sum.is_monotonic, "Should produce monotonic.");
1164 assert_eq!(
1165 sum.temporality,
1166 Temporality::Cumulative,
1167 "Should produce cumulative"
1168 );
1169
1170 let data_point = &sum.data_points[0];
1171 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1172 assert_eq!(data_point.value, 55, "Unexpected data point value");
1173 }
1174
1175 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1176 async fn no_attr_delta_counter_value_reset_after_export() {
1177 let mut test_context = TestContext::new(Temporality::Delta);
1178 let counter = test_context.u64_counter("test", "my_counter", None);
1179
1180 counter.add(50, &[]);
1181 test_context.flush_metrics();
1182 let _ = test_context.get_aggregation::<u64>("my_counter", None);
1183 test_context.reset_metrics();
1184
1185 counter.add(5, &[]);
1186 test_context.flush_metrics();
1187 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1188 unreachable!()
1189 };
1190
1191 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
1192 assert!(sum.is_monotonic, "Should produce monotonic.");
1193 assert_eq!(
1194 sum.temporality,
1195 Temporality::Delta,
1196 "Should produce cumulative"
1197 );
1198
1199 let data_point = &sum.data_points[0];
1200 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
1201 assert_eq!(data_point.value, 5, "Unexpected data point value");
1202 }
1203
1204 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1205 async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() {
1206 let mut test_context = TestContext::new(Temporality::Delta);
1207 let counter = test_context.u64_counter("test", "my_counter", None);
1208
1209 counter.add(50, &[]);
1210 test_context.flush_metrics();
1211 let _ = test_context.get_aggregation::<u64>("my_counter", None);
1212 test_context.reset_metrics();
1213
1214 counter.add(50, &[KeyValue::new("a", "b")]);
1215 test_context.flush_metrics();
1216 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1217 unreachable!()
1218 };
1219
1220 let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty());
1221
1222 assert!(
1223 no_attr_data_point.is_none(),
1224 "Expected no data points with no attributes"
1225 );
1226 }
1227
1228 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1229 async fn delta_memory_efficiency_test() {
1230 let mut test_context = TestContext::new(Temporality::Delta);
1235 let counter = test_context.u64_counter("test", "my_counter", None);
1236
1237 counter.add(1, &[KeyValue::new("key1", "value1")]);
1239 counter.add(1, &[KeyValue::new("key1", "value1")]);
1240 counter.add(1, &[KeyValue::new("key1", "value1")]);
1241 counter.add(1, &[KeyValue::new("key1", "value1")]);
1242 counter.add(1, &[KeyValue::new("key1", "value1")]);
1243
1244 counter.add(1, &[KeyValue::new("key1", "value2")]);
1245 counter.add(1, &[KeyValue::new("key1", "value2")]);
1246 counter.add(1, &[KeyValue::new("key1", "value2")]);
1247 test_context.flush_metrics();
1248
1249 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
1250 unreachable!()
1251 };
1252
1253 assert_eq!(sum.data_points.len(), 2);
1255
1256 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1258 .expect("datapoint with key1=value1 expected");
1259 assert_eq!(data_point1.value, 5);
1260
1261 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1263 .expect("datapoint with key1=value2 expected");
1264 assert_eq!(data_point1.value, 3);
1265
1266 test_context.exporter.reset();
1267 test_context.flush_metrics();
1270
1271 let resource_metrics = test_context
1272 .exporter
1273 .get_finished_metrics()
1274 .expect("metrics are expected to be exported.");
1275 println!("resource_metrics: {:?}", resource_metrics);
1276 assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
1277 }
1278
1279 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1280 async fn counter_multithreaded() {
1281 counter_multithreaded_aggregation_helper(Temporality::Delta);
1285 counter_multithreaded_aggregation_helper(Temporality::Cumulative);
1286 }
1287
1288 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1289 async fn counter_f64_multithreaded() {
1290 counter_f64_multithreaded_aggregation_helper(Temporality::Delta);
1294 counter_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1295 }
1296
1297 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1298 async fn histogram_multithreaded() {
1299 histogram_multithreaded_aggregation_helper(Temporality::Delta);
1303 histogram_multithreaded_aggregation_helper(Temporality::Cumulative);
1304 }
1305
1306 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1307 async fn histogram_f64_multithreaded() {
1308 histogram_f64_multithreaded_aggregation_helper(Temporality::Delta);
1312 histogram_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
1313 }
1314 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1315 async fn synchronous_instruments_cumulative_with_gap_in_measurements() {
1316 synchronous_instruments_cumulative_with_gap_in_measurements_helper("counter");
1320 synchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter");
1321 synchronous_instruments_cumulative_with_gap_in_measurements_helper("histogram");
1322 synchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
1323 }
1324
1325 fn synchronous_instruments_cumulative_with_gap_in_measurements_helper(
1326 instrument_name: &'static str,
1327 ) {
1328 let mut test_context = TestContext::new(Temporality::Cumulative);
1329 let attributes = &[KeyValue::new("key1", "value1")];
1330
1331 match instrument_name {
1333 "counter" => {
1334 let counter = test_context.meter().u64_counter("test_counter").build();
1335 counter.add(5, &[]);
1336 counter.add(10, attributes);
1337 }
1338 "updown_counter" => {
1339 let updown_counter = test_context
1340 .meter()
1341 .i64_up_down_counter("test_updowncounter")
1342 .build();
1343 updown_counter.add(15, &[]);
1344 updown_counter.add(20, attributes);
1345 }
1346 "histogram" => {
1347 let histogram = test_context.meter().u64_histogram("test_histogram").build();
1348 histogram.record(25, &[]);
1349 histogram.record(30, attributes);
1350 }
1351 "gauge" => {
1352 let gauge = test_context.meter().u64_gauge("test_gauge").build();
1353 gauge.record(35, &[]);
1354 gauge.record(40, attributes);
1355 }
1356 _ => panic!("Incorrect instrument kind provided"),
1357 };
1358
1359 test_context.flush_metrics();
1360
1361 assert_correct_export(&mut test_context, instrument_name);
1363
1364 test_context.reset_metrics();
1366
1367 test_context.flush_metrics();
1368
1369 assert_correct_export(&mut test_context, instrument_name);
1371
1372 fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1373 match instrument_name {
1374 "counter" => {
1375 let MetricData::Sum(sum) =
1376 test_context.get_aggregation::<u64>("test_counter", None)
1377 else {
1378 unreachable!()
1379 };
1380 assert_eq!(sum.data_points.len(), 2);
1381 let zero_attribute_datapoint =
1382 find_sum_datapoint_with_no_attributes(&sum.data_points)
1383 .expect("datapoint with no attributes expected");
1384 assert_eq!(zero_attribute_datapoint.value, 5);
1385 let data_point1 =
1386 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1387 .expect("datapoint with key1=value1 expected");
1388 assert_eq!(data_point1.value, 10);
1389 }
1390 "updown_counter" => {
1391 let MetricData::Sum(sum) =
1392 test_context.get_aggregation::<i64>("test_updowncounter", None)
1393 else {
1394 unreachable!()
1395 };
1396 assert_eq!(sum.data_points.len(), 2);
1397 let zero_attribute_datapoint =
1398 find_sum_datapoint_with_no_attributes(&sum.data_points)
1399 .expect("datapoint with no attributes expected");
1400 assert_eq!(zero_attribute_datapoint.value, 15);
1401 let data_point1 =
1402 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1403 .expect("datapoint with key1=value1 expected");
1404 assert_eq!(data_point1.value, 20);
1405 }
1406 "histogram" => {
1407 let MetricData::Histogram(histogram_data) =
1408 test_context.get_aggregation::<u64>("test_histogram", None)
1409 else {
1410 unreachable!()
1411 };
1412 assert_eq!(histogram_data.data_points.len(), 2);
1413 let zero_attribute_datapoint =
1414 find_histogram_datapoint_with_no_attributes(&histogram_data.data_points)
1415 .expect("datapoint with no attributes expected");
1416 assert_eq!(zero_attribute_datapoint.count, 1);
1417 assert_eq!(zero_attribute_datapoint.sum, 25);
1418 assert_eq!(zero_attribute_datapoint.min, Some(25));
1419 assert_eq!(zero_attribute_datapoint.max, Some(25));
1420 let data_point1 = find_histogram_datapoint_with_key_value(
1421 &histogram_data.data_points,
1422 "key1",
1423 "value1",
1424 )
1425 .expect("datapoint with key1=value1 expected");
1426 assert_eq!(data_point1.count, 1);
1427 assert_eq!(data_point1.sum, 30);
1428 assert_eq!(data_point1.min, Some(30));
1429 assert_eq!(data_point1.max, Some(30));
1430 }
1431 "gauge" => {
1432 let MetricData::Gauge(gauge_data) =
1433 test_context.get_aggregation::<u64>("test_gauge", None)
1434 else {
1435 unreachable!()
1436 };
1437 assert_eq!(gauge_data.data_points.len(), 2);
1438 let zero_attribute_datapoint =
1439 find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1440 .expect("datapoint with no attributes expected");
1441 assert_eq!(zero_attribute_datapoint.value, 35);
1442 let data_point1 = find_gauge_datapoint_with_key_value(
1443 &gauge_data.data_points,
1444 "key1",
1445 "value1",
1446 )
1447 .expect("datapoint with key1=value1 expected");
1448 assert_eq!(data_point1.value, 40);
1449 }
1450 _ => panic!("Incorrect instrument kind provided"),
1451 }
1452 }
1453 }
1454
1455 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1456 async fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement() {
1457 asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1461 "gauge", true,
1462 );
1463 asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1466 "counter", false,
1467 );
1468 asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1469 "updown_counter",
1470 false,
1471 );
1472 }
1473
1474 #[test]
1475 fn view_test_rename() {
1476 test_view_customization(
1477 |i| {
1478 if i.name == "my_counter" {
1479 Some(
1480 Stream::builder()
1481 .with_name("my_counter_renamed")
1482 .build()
1483 .unwrap(),
1484 )
1485 } else {
1486 None
1487 }
1488 },
1489 "my_counter_renamed",
1490 "my_unit",
1491 "my_description",
1492 )
1493 }
1494
1495 #[test]
1496 fn view_test_change_unit() {
1497 test_view_customization(
1498 |i| {
1499 if i.name == "my_counter" {
1500 Some(Stream::builder().with_unit("my_unit_new").build().unwrap())
1501 } else {
1502 None
1503 }
1504 },
1505 "my_counter",
1506 "my_unit_new",
1507 "my_description",
1508 )
1509 }
1510
1511 #[test]
1512 fn view_test_change_description() {
1513 test_view_customization(
1514 |i| {
1515 if i.name == "my_counter" {
1516 Some(
1517 Stream::builder()
1518 .with_description("my_description_new")
1519 .build()
1520 .unwrap(),
1521 )
1522 } else {
1523 None
1524 }
1525 },
1526 "my_counter",
1527 "my_unit",
1528 "my_description_new",
1529 )
1530 }
1531
1532 #[test]
1533 fn view_test_change_name_unit() {
1534 test_view_customization(
1535 |i| {
1536 if i.name == "my_counter" {
1537 Some(
1538 Stream::builder()
1539 .with_name("my_counter_renamed")
1540 .with_unit("my_unit_new")
1541 .build()
1542 .unwrap(),
1543 )
1544 } else {
1545 None
1546 }
1547 },
1548 "my_counter_renamed",
1549 "my_unit_new",
1550 "my_description",
1551 )
1552 }
1553
1554 #[test]
1555 fn view_test_change_name_unit_desc() {
1556 test_view_customization(
1557 |i| {
1558 if i.name == "my_counter" {
1559 Some(
1560 Stream::builder()
1561 .with_name("my_counter_renamed")
1562 .with_unit("my_unit_new")
1563 .with_description("my_description_new")
1564 .build()
1565 .unwrap(),
1566 )
1567 } else {
1568 None
1569 }
1570 },
1571 "my_counter_renamed",
1572 "my_unit_new",
1573 "my_description_new",
1574 )
1575 }
1576
1577 #[test]
1578 fn view_test_match_unit() {
1579 test_view_customization(
1580 |i| {
1581 if i.unit == "my_unit" {
1582 Some(Stream::builder().with_unit("my_unit_new").build().unwrap())
1583 } else {
1584 None
1585 }
1586 },
1587 "my_counter",
1588 "my_unit_new",
1589 "my_description",
1590 )
1591 }
1592
1593 #[test]
1594 fn view_test_match_none() {
1595 test_view_customization(
1596 |i| {
1597 if i.name == "not_expected_to_match" {
1598 Some(Stream::builder().build().unwrap())
1599 } else {
1600 None
1601 }
1602 },
1603 "my_counter",
1604 "my_unit",
1605 "my_description",
1606 )
1607 }
1608
1609 #[test]
1610 fn view_test_match_multiple() {
1611 test_view_customization(
1612 |i| {
1613 if i.name == "my_counter" && i.unit == "my_unit" {
1614 Some(
1615 Stream::builder()
1616 .with_name("my_counter_renamed")
1617 .build()
1618 .unwrap(),
1619 )
1620 } else {
1621 None
1622 }
1623 },
1624 "my_counter_renamed",
1625 "my_unit",
1626 "my_description",
1627 )
1628 }
1629
1630 fn test_view_customization<F>(
1632 view_function: F,
1633 expected_name: &str,
1634 expected_unit: &str,
1635 expected_description: &str,
1636 ) where
1637 F: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
1638 {
1639 let exporter = InMemoryMetricExporter::default();
1644 let meter_provider = SdkMeterProvider::builder()
1645 .with_periodic_exporter(exporter.clone())
1646 .with_view(view_function)
1647 .build();
1648
1649 let meter = meter_provider.meter("test");
1651 let counter = meter
1652 .f64_counter("my_counter")
1653 .with_unit("my_unit")
1654 .with_description("my_description")
1655 .build();
1656
1657 counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1658 meter_provider.force_flush().unwrap();
1659
1660 let resource_metrics = exporter
1662 .get_finished_metrics()
1663 .expect("metrics are expected to be exported.");
1664 assert_eq!(resource_metrics.len(), 1);
1665 assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1666 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1667 assert_eq!(
1668 metric.name, expected_name,
1669 "Expected name: {}.",
1670 expected_name
1671 );
1672 assert_eq!(
1673 metric.unit, expected_unit,
1674 "Expected unit: {}.",
1675 expected_unit
1676 );
1677 assert_eq!(
1678 metric.description, expected_description,
1679 "Expected description: {}.",
1680 expected_description
1681 );
1682 }
1683
1684 #[test]
1691 fn test_view_single_instrument_multiple_stream() {
1692 let view1 = |i: &Instrument| {
1700 if i.name() == "my_counter" {
1701 Some(Stream::builder().with_name("my_counter_1").build().unwrap())
1702 } else {
1703 None
1704 }
1705 };
1706
1707 let view2 = |i: &Instrument| {
1708 if i.name() == "my_counter" {
1709 Some(Stream::builder().with_name("my_counter_2").build().unwrap())
1710 } else {
1711 None
1712 }
1713 };
1714
1715 let exporter = InMemoryMetricExporter::default();
1717 let meter_provider = SdkMeterProvider::builder()
1718 .with_periodic_exporter(exporter.clone())
1719 .with_view(view1)
1720 .with_view(view2)
1721 .build();
1722
1723 let meter = meter_provider.meter("test");
1725 let counter = meter.f64_counter("my_counter").build();
1726
1727 counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1728 meter_provider.force_flush().unwrap();
1729
1730 let resource_metrics = exporter
1732 .get_finished_metrics()
1733 .expect("metrics are expected to be exported.");
1734 assert_eq!(resource_metrics.len(), 1);
1735 assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1736 let metrics = &resource_metrics[0].scope_metrics[0].metrics;
1737 assert_eq!(metrics.len(), 2);
1738 assert_eq!(metrics[0].name, "my_counter_1");
1739 assert_eq!(metrics[1].name, "my_counter_2");
1740 }
1741
1742 #[test]
1743 fn test_view_multiple_instrument_single_stream() {
1744 let view = |i: &Instrument| {
1751 if i.name() == "my_counter1" || i.name() == "my_counter2" {
1752 Some(Stream::builder().with_name("my_counter").build().unwrap())
1753 } else {
1754 None
1755 }
1756 };
1757
1758 let exporter = InMemoryMetricExporter::default();
1760 let meter_provider = SdkMeterProvider::builder()
1761 .with_periodic_exporter(exporter.clone())
1762 .with_view(view)
1763 .build();
1764
1765 let meter = meter_provider.meter("test");
1767 let counter1 = meter.f64_counter("my_counter1").build();
1768 let counter2 = meter.f64_counter("my_counter2").build();
1769
1770 counter1.add(1.5, &[KeyValue::new("key1", "value1")]);
1771 counter2.add(1.5, &[KeyValue::new("key1", "value1")]);
1772 meter_provider.force_flush().unwrap();
1773
1774 let resource_metrics = exporter
1776 .get_finished_metrics()
1777 .expect("metrics are expected to be exported.");
1778 assert_eq!(resource_metrics.len(), 1);
1779 assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1780 let metrics = &resource_metrics[0].scope_metrics[0].metrics;
1781 assert_eq!(metrics.len(), 1);
1782 assert_eq!(metrics[0].name, "my_counter");
1783 }
1785
1786 fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1787 instrument_name: &'static str,
1788 should_not_emit: bool,
1789 ) {
1790 let mut test_context = TestContext::new(Temporality::Cumulative);
1791 let attributes = Arc::new([KeyValue::new("key1", "value1")]);
1792
1793 match instrument_name {
1795 "counter" => {
1796 let has_run = AtomicBool::new(false);
1797 let _observable_counter = test_context
1798 .meter()
1799 .u64_observable_counter("test_counter")
1800 .with_callback(move |observer| {
1801 if !has_run.load(Ordering::SeqCst) {
1802 observer.observe(5, &[]);
1803 observer.observe(10, &*attributes.clone());
1804 has_run.store(true, Ordering::SeqCst);
1805 }
1806 })
1807 .build();
1808 }
1809 "updown_counter" => {
1810 let has_run = AtomicBool::new(false);
1811 let _observable_up_down_counter = test_context
1812 .meter()
1813 .i64_observable_up_down_counter("test_updowncounter")
1814 .with_callback(move |observer| {
1815 if !has_run.load(Ordering::SeqCst) {
1816 observer.observe(15, &[]);
1817 observer.observe(20, &*attributes.clone());
1818 has_run.store(true, Ordering::SeqCst);
1819 }
1820 })
1821 .build();
1822 }
1823 "gauge" => {
1824 let has_run = AtomicBool::new(false);
1825 let _observable_gauge = test_context
1826 .meter()
1827 .u64_observable_gauge("test_gauge")
1828 .with_callback(move |observer| {
1829 if !has_run.load(Ordering::SeqCst) {
1830 observer.observe(25, &[]);
1831 observer.observe(30, &*attributes.clone());
1832 has_run.store(true, Ordering::SeqCst);
1833 }
1834 })
1835 .build();
1836 }
1837 _ => panic!("Incorrect instrument kind provided"),
1838 };
1839
1840 test_context.flush_metrics();
1841
1842 assert_correct_export(&mut test_context, instrument_name);
1844
1845 test_context.reset_metrics();
1847
1848 test_context.flush_metrics();
1849
1850 if should_not_emit {
1851 test_context.check_no_metrics();
1852 } else {
1853 assert_correct_export(&mut test_context, instrument_name);
1855 }
1856
1857 fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1858 match instrument_name {
1859 "counter" => {
1860 let MetricData::Sum(sum) =
1861 test_context.get_aggregation::<u64>("test_counter", None)
1862 else {
1863 unreachable!()
1864 };
1865 assert_eq!(sum.data_points.len(), 2);
1866 assert!(sum.is_monotonic);
1867 let zero_attribute_datapoint =
1868 find_sum_datapoint_with_no_attributes(&sum.data_points)
1869 .expect("datapoint with no attributes expected");
1870 assert_eq!(zero_attribute_datapoint.value, 5);
1871 let data_point1 =
1872 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1873 .expect("datapoint with key1=value1 expected");
1874 assert_eq!(data_point1.value, 10);
1875 }
1876 "updown_counter" => {
1877 let MetricData::Sum(sum) =
1878 test_context.get_aggregation::<i64>("test_updowncounter", None)
1879 else {
1880 unreachable!()
1881 };
1882 assert_eq!(sum.data_points.len(), 2);
1883 assert!(!sum.is_monotonic);
1884 let zero_attribute_datapoint =
1885 find_sum_datapoint_with_no_attributes(&sum.data_points)
1886 .expect("datapoint with no attributes expected");
1887 assert_eq!(zero_attribute_datapoint.value, 15);
1888 let data_point1 =
1889 find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1890 .expect("datapoint with key1=value1 expected");
1891 assert_eq!(data_point1.value, 20);
1892 }
1893 "gauge" => {
1894 let MetricData::Gauge(gauge_data) =
1895 test_context.get_aggregation::<u64>("test_gauge", None)
1896 else {
1897 unreachable!()
1898 };
1899 assert_eq!(gauge_data.data_points.len(), 2);
1900 let zero_attribute_datapoint =
1901 find_gauge_datapoint_with_no_attributes(&gauge_data.data_points)
1902 .expect("datapoint with no attributes expected");
1903 assert_eq!(zero_attribute_datapoint.value, 25);
1904 let data_point1 = find_gauge_datapoint_with_key_value(
1905 &gauge_data.data_points,
1906 "key1",
1907 "value1",
1908 )
1909 .expect("datapoint with key1=value1 expected");
1910 assert_eq!(data_point1.value, 30);
1911 }
1912 _ => panic!("Incorrect instrument kind provided"),
1913 }
1914 }
1915 }
1916
1917 fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
1918 let mut test_context = TestContext::new(temporality);
1920 let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));
1921
1922 for i in 0..10 {
1923 thread::scope(|s| {
1924 s.spawn(|| {
1925 counter.add(1, &[]);
1926
1927 counter.add(1, &[KeyValue::new("key1", "value1")]);
1928 counter.add(1, &[KeyValue::new("key1", "value1")]);
1929 counter.add(1, &[KeyValue::new("key1", "value1")]);
1930
1931 if i % 2 == 0 {
1933 test_context.flush_metrics();
1934 thread::sleep(Duration::from_millis(i)); }
1936
1937 counter.add(1, &[KeyValue::new("key1", "value1")]);
1938 counter.add(1, &[KeyValue::new("key1", "value1")]);
1939 });
1940 });
1941 }
1942
1943 test_context.flush_metrics();
1944
1945 let sums = test_context
1948 .get_from_multiple_aggregations::<u64>("my_counter", None, 6)
1949 .into_iter()
1950 .map(|data| {
1951 if let MetricData::Sum(sum) = data {
1952 sum
1953 } else {
1954 unreachable!()
1955 }
1956 })
1957 .collect::<Vec<_>>();
1958
1959 let mut sum_zero_attributes = 0;
1960 let mut sum_key1_value1 = 0;
1961 sums.iter().for_each(|sum| {
1962 assert_eq!(sum.data_points.len(), 2); assert!(sum.is_monotonic, "Counter should produce monotonic.");
1964 assert_eq!(sum.temporality, temporality);
1965
1966 if temporality == Temporality::Delta {
1967 sum_zero_attributes += sum.data_points[0].value;
1968 sum_key1_value1 += sum.data_points[1].value;
1969 } else {
1970 sum_zero_attributes = sum.data_points[0].value;
1971 sum_key1_value1 = sum.data_points[1].value;
1972 };
1973 });
1974
1975 assert_eq!(sum_zero_attributes, 10);
1976 assert_eq!(sum_key1_value1, 50); }
1978
1979 fn counter_f64_multithreaded_aggregation_helper(temporality: Temporality) {
1980 let mut test_context = TestContext::new(temporality);
1982 let counter = Arc::new(test_context.meter().f64_counter("test_counter").build());
1983
1984 for i in 0..10 {
1985 thread::scope(|s| {
1986 s.spawn(|| {
1987 counter.add(1.23, &[]);
1988
1989 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1990 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1991 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
1992
1993 if i % 2 == 0 {
1995 test_context.flush_metrics();
1996 thread::sleep(Duration::from_millis(i)); }
1998
1999 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
2000 counter.add(1.23, &[KeyValue::new("key1", "value1")]);
2001 });
2002 });
2003 }
2004
2005 test_context.flush_metrics();
2006
2007 let sums = test_context
2010 .get_from_multiple_aggregations::<f64>("test_counter", None, 6)
2011 .into_iter()
2012 .map(|data| {
2013 if let MetricData::Sum(sum) = data {
2014 sum
2015 } else {
2016 unreachable!()
2017 }
2018 })
2019 .collect::<Vec<_>>();
2020
2021 let mut sum_zero_attributes = 0.0;
2022 let mut sum_key1_value1 = 0.0;
2023 sums.iter().for_each(|sum| {
2024 assert_eq!(sum.data_points.len(), 2); assert!(sum.is_monotonic, "Counter should produce monotonic.");
2026 assert_eq!(sum.temporality, temporality);
2027
2028 if temporality == Temporality::Delta {
2029 sum_zero_attributes += sum.data_points[0].value;
2030 sum_key1_value1 += sum.data_points[1].value;
2031 } else {
2032 sum_zero_attributes = sum.data_points[0].value;
2033 sum_key1_value1 = sum.data_points[1].value;
2034 };
2035 });
2036
2037 assert!(f64::abs(12.3 - sum_zero_attributes) < 0.0001);
2038 assert!(f64::abs(61.5 - sum_key1_value1) < 0.0001); }
2040
2041 fn histogram_multithreaded_aggregation_helper(temporality: Temporality) {
2042 let mut test_context = TestContext::new(temporality);
2044 let histogram = Arc::new(test_context.meter().u64_histogram("test_histogram").build());
2045
2046 for i in 0..10 {
2047 thread::scope(|s| {
2048 s.spawn(|| {
2049 histogram.record(1, &[]);
2050 histogram.record(4, &[]);
2051
2052 histogram.record(5, &[KeyValue::new("key1", "value1")]);
2053 histogram.record(7, &[KeyValue::new("key1", "value1")]);
2054 histogram.record(18, &[KeyValue::new("key1", "value1")]);
2055
2056 if i % 2 == 0 {
2058 test_context.flush_metrics();
2059 thread::sleep(Duration::from_millis(i)); }
2061
2062 histogram.record(35, &[KeyValue::new("key1", "value1")]);
2063 histogram.record(35, &[KeyValue::new("key1", "value1")]);
2064 });
2065 });
2066 }
2067
2068 test_context.flush_metrics();
2069
2070 let histograms = test_context
2073 .get_from_multiple_aggregations::<u64>("test_histogram", None, 6)
2074 .into_iter()
2075 .map(|data| {
2076 if let MetricData::Histogram(hist) = data {
2077 hist
2078 } else {
2079 unreachable!()
2080 }
2081 })
2082 .collect::<Vec<_>>();
2083
2084 let (
2085 mut sum_zero_attributes,
2086 mut count_zero_attributes,
2087 mut min_zero_attributes,
2088 mut max_zero_attributes,
2089 ) = (0, 0, u64::MAX, u64::MIN);
2090 let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
2091 (0, 0, u64::MAX, u64::MIN);
2092
2093 let mut bucket_counts_zero_attributes = vec![0; 16]; let mut bucket_counts_key1_value1 = vec![0; 16];
2095
2096 histograms.iter().for_each(|histogram| {
2097 assert_eq!(histogram.data_points.len(), 2); assert_eq!(histogram.temporality, temporality);
2099
2100 let data_point_zero_attributes =
2101 find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
2102 let data_point_key1_value1 =
2103 find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
2104 .unwrap();
2105
2106 if temporality == Temporality::Delta {
2107 sum_zero_attributes += data_point_zero_attributes.sum;
2108 sum_key1_value1 += data_point_key1_value1.sum;
2109
2110 count_zero_attributes += data_point_zero_attributes.count;
2111 count_key1_value1 += data_point_key1_value1.count;
2112
2113 min_zero_attributes =
2114 min(min_zero_attributes, data_point_zero_attributes.min.unwrap());
2115 min_key1_value1 = min(min_key1_value1, data_point_key1_value1.min.unwrap());
2116
2117 max_zero_attributes =
2118 max(max_zero_attributes, data_point_zero_attributes.max.unwrap());
2119 max_key1_value1 = max(max_key1_value1, data_point_key1_value1.max.unwrap());
2120
2121 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2122 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2123
2124 for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
2125 bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
2126 }
2127
2128 for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
2129 bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
2130 }
2131 } else {
2132 sum_zero_attributes = data_point_zero_attributes.sum;
2133 sum_key1_value1 = data_point_key1_value1.sum;
2134
2135 count_zero_attributes = data_point_zero_attributes.count;
2136 count_key1_value1 = data_point_key1_value1.count;
2137
2138 min_zero_attributes = data_point_zero_attributes.min.unwrap();
2139 min_key1_value1 = data_point_key1_value1.min.unwrap();
2140
2141 max_zero_attributes = data_point_zero_attributes.max.unwrap();
2142 max_key1_value1 = data_point_key1_value1.max.unwrap();
2143
2144 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2145 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2146
2147 bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
2148 bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
2149 };
2150 });
2151
2152 assert_eq!(count_zero_attributes, 20); assert_eq!(sum_zero_attributes, 50); assert_eq!(min_zero_attributes, 1);
2159 assert_eq!(max_zero_attributes, 4);
2160
2161 for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
2162 match i {
2163 1 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
2165 }
2166 }
2167
2168 assert_eq!(count_key1_value1, 50); assert_eq!(sum_key1_value1, 1000); assert_eq!(min_key1_value1, 5);
2171 assert_eq!(max_key1_value1, 35);
2172
2173 for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
2174 match i {
2175 1 => assert_eq!(*count, 10), 2 => assert_eq!(*count, 10), 3 => assert_eq!(*count, 10), 4 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
2180 }
2181 }
2182 }
2183
2184 fn histogram_f64_multithreaded_aggregation_helper(temporality: Temporality) {
2185 let mut test_context = TestContext::new(temporality);
2187 let histogram = Arc::new(test_context.meter().f64_histogram("test_histogram").build());
2188
2189 for i in 0..10 {
2190 thread::scope(|s| {
2191 s.spawn(|| {
2192 histogram.record(1.5, &[]);
2193 histogram.record(4.6, &[]);
2194
2195 histogram.record(5.0, &[KeyValue::new("key1", "value1")]);
2196 histogram.record(7.3, &[KeyValue::new("key1", "value1")]);
2197 histogram.record(18.1, &[KeyValue::new("key1", "value1")]);
2198
2199 if i % 2 == 0 {
2201 test_context.flush_metrics();
2202 thread::sleep(Duration::from_millis(i)); }
2204
2205 histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
2206 histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
2207 });
2208 });
2209 }
2210
2211 test_context.flush_metrics();
2212
2213 let histograms = test_context
2216 .get_from_multiple_aggregations::<f64>("test_histogram", None, 6)
2217 .into_iter()
2218 .map(|data| {
2219 if let MetricData::Histogram(hist) = data {
2220 hist
2221 } else {
2222 unreachable!()
2223 }
2224 })
2225 .collect::<Vec<_>>();
2226
2227 let (
2228 mut sum_zero_attributes,
2229 mut count_zero_attributes,
2230 mut min_zero_attributes,
2231 mut max_zero_attributes,
2232 ) = (0.0, 0, f64::MAX, f64::MIN);
2233 let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
2234 (0.0, 0, f64::MAX, f64::MIN);
2235
2236 let mut bucket_counts_zero_attributes = vec![0; 16]; let mut bucket_counts_key1_value1 = vec![0; 16];
2238
2239 histograms.iter().for_each(|histogram| {
2240 assert_eq!(histogram.data_points.len(), 2); assert_eq!(histogram.temporality, temporality);
2242
2243 let data_point_zero_attributes =
2244 find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
2245 let data_point_key1_value1 =
2246 find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
2247 .unwrap();
2248
2249 if temporality == Temporality::Delta {
2250 sum_zero_attributes += data_point_zero_attributes.sum;
2251 sum_key1_value1 += data_point_key1_value1.sum;
2252
2253 count_zero_attributes += data_point_zero_attributes.count;
2254 count_key1_value1 += data_point_key1_value1.count;
2255
2256 min_zero_attributes =
2257 min_zero_attributes.min(data_point_zero_attributes.min.unwrap());
2258 min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap());
2259
2260 max_zero_attributes =
2261 max_zero_attributes.max(data_point_zero_attributes.max.unwrap());
2262 max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap());
2263
2264 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2265 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2266
2267 for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
2268 bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
2269 }
2270
2271 for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
2272 bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
2273 }
2274 } else {
2275 sum_zero_attributes = data_point_zero_attributes.sum;
2276 sum_key1_value1 = data_point_key1_value1.sum;
2277
2278 count_zero_attributes = data_point_zero_attributes.count;
2279 count_key1_value1 = data_point_key1_value1.count;
2280
2281 min_zero_attributes = data_point_zero_attributes.min.unwrap();
2282 min_key1_value1 = data_point_key1_value1.min.unwrap();
2283
2284 max_zero_attributes = data_point_zero_attributes.max.unwrap();
2285 max_key1_value1 = data_point_key1_value1.max.unwrap();
2286
2287 assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
2288 assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);
2289
2290 bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
2291 bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
2292 };
2293 });
2294
2295 assert_eq!(count_zero_attributes, 20); assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); assert_eq!(min_zero_attributes, 1.5);
2302 assert_eq!(max_zero_attributes, 4.6);
2303
2304 for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
2305 match i {
2306 1 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
2308 }
2309 }
2310
2311 assert_eq!(count_key1_value1, 50); assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); assert_eq!(min_key1_value1, 5.0);
2314 assert_eq!(max_key1_value1, 35.1);
2315
2316 for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
2317 match i {
2318 1 => assert_eq!(*count, 10), 2 => assert_eq!(*count, 10), 3 => assert_eq!(*count, 10), 4 => assert_eq!(*count, 20), _ => assert_eq!(*count, 0),
2323 }
2324 }
2325 }
2326
2327 fn histogram_aggregation_helper(temporality: Temporality) {
2328 let mut test_context = TestContext::new(temporality);
2330 let histogram = test_context.meter().u64_histogram("my_histogram").build();
2331
2332 let mut rand = rngs::SmallRng::from_os_rng();
2334 let values_kv1 = (0..50)
2335 .map(|_| rand.random_range(0..100))
2336 .collect::<Vec<u64>>();
2337 for value in values_kv1.iter() {
2338 histogram.record(*value, &[KeyValue::new("key1", "value1")]);
2339 }
2340
2341 let values_kv2 = (0..30)
2342 .map(|_| rand.random_range(0..100))
2343 .collect::<Vec<u64>>();
2344 for value in values_kv2.iter() {
2345 histogram.record(*value, &[KeyValue::new("key1", "value2")]);
2346 }
2347
2348 test_context.flush_metrics();
2349
2350 let MetricData::Histogram(histogram_data) =
2352 test_context.get_aggregation::<u64>("my_histogram", None)
2353 else {
2354 unreachable!()
2355 };
2356 assert_eq!(histogram_data.data_points.len(), 2);
2358 if let Temporality::Cumulative = temporality {
2359 assert_eq!(
2360 histogram_data.temporality,
2361 Temporality::Cumulative,
2362 "Should produce cumulative"
2363 );
2364 } else {
2365 assert_eq!(
2366 histogram_data.temporality,
2367 Temporality::Delta,
2368 "Should produce delta"
2369 );
2370 }
2371
2372 let data_point1 =
2374 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2375 .expect("datapoint with key1=value1 expected");
2376 assert_eq!(data_point1.count, values_kv1.len() as u64);
2377 assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
2378 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2379 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2380
2381 let data_point2 =
2382 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
2383 .expect("datapoint with key1=value2 expected");
2384 assert_eq!(data_point2.count, values_kv2.len() as u64);
2385 assert_eq!(data_point2.sum, values_kv2.iter().sum::<u64>());
2386 assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap());
2387 assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap());
2388
2389 test_context.reset_metrics();
2391 for value in values_kv1.iter() {
2392 histogram.record(*value, &[KeyValue::new("key1", "value1")]);
2393 }
2394
2395 for value in values_kv2.iter() {
2396 histogram.record(*value, &[KeyValue::new("key1", "value2")]);
2397 }
2398
2399 test_context.flush_metrics();
2400
2401 let MetricData::Histogram(histogram_data) =
2402 test_context.get_aggregation::<u64>("my_histogram", None)
2403 else {
2404 unreachable!()
2405 };
2406 assert_eq!(histogram_data.data_points.len(), 2);
2407 let data_point1 =
2408 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2409 .expect("datapoint with key1=value1 expected");
2410 if temporality == Temporality::Cumulative {
2411 assert_eq!(data_point1.count, 2 * (values_kv1.len() as u64));
2412 assert_eq!(data_point1.sum, 2 * (values_kv1.iter().sum::<u64>()));
2413 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2414 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2415 } else {
2416 assert_eq!(data_point1.count, values_kv1.len() as u64);
2417 assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
2418 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
2419 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
2420 }
2421
2422 let data_point1 =
2423 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
2424 .expect("datapoint with key1=value1 expected");
2425 if temporality == Temporality::Cumulative {
2426 assert_eq!(data_point1.count, 2 * (values_kv2.len() as u64));
2427 assert_eq!(data_point1.sum, 2 * (values_kv2.iter().sum::<u64>()));
2428 assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2429 assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2430 } else {
2431 assert_eq!(data_point1.count, values_kv2.len() as u64);
2432 assert_eq!(data_point1.sum, values_kv2.iter().sum::<u64>());
2433 assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
2434 assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
2435 }
2436 }
2437
2438 fn histogram_aggregation_with_custom_bounds_helper(temporality: Temporality) {
2439 let mut test_context = TestContext::new(temporality);
2440 let histogram = test_context
2441 .meter()
2442 .u64_histogram("test_histogram")
2443 .with_boundaries(vec![1.0, 2.5, 5.5])
2444 .build();
2445 histogram.record(1, &[KeyValue::new("key1", "value1")]);
2446 histogram.record(2, &[KeyValue::new("key1", "value1")]);
2447 histogram.record(3, &[KeyValue::new("key1", "value1")]);
2448 histogram.record(4, &[KeyValue::new("key1", "value1")]);
2449 histogram.record(5, &[KeyValue::new("key1", "value1")]);
2450
2451 test_context.flush_metrics();
2452
2453 let MetricData::Histogram(histogram_data) =
2455 test_context.get_aggregation::<u64>("test_histogram", None)
2456 else {
2457 unreachable!()
2458 };
2459 assert_eq!(histogram_data.data_points.len(), 1);
2461 if let Temporality::Cumulative = temporality {
2462 assert_eq!(
2463 histogram_data.temporality,
2464 Temporality::Cumulative,
2465 "Should produce cumulative"
2466 );
2467 } else {
2468 assert_eq!(
2469 histogram_data.temporality,
2470 Temporality::Delta,
2471 "Should produce delta"
2472 );
2473 }
2474
2475 let data_point =
2477 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2478 .expect("datapoint with key1=value1 expected");
2479
2480 assert_eq!(data_point.count, 5);
2481 assert_eq!(data_point.sum, 15);
2482
2483 assert_eq!(vec![1.0, 2.5, 5.5], data_point.bounds);
2490 assert_eq!(vec![1, 1, 3, 0], data_point.bucket_counts);
2491 }
2492
2493 fn histogram_aggregation_with_empty_bounds_helper(temporality: Temporality) {
2494 let mut test_context = TestContext::new(temporality);
2495 let histogram = test_context
2496 .meter()
2497 .u64_histogram("test_histogram")
2498 .with_boundaries(vec![])
2499 .build();
2500 histogram.record(1, &[KeyValue::new("key1", "value1")]);
2501 histogram.record(2, &[KeyValue::new("key1", "value1")]);
2502 histogram.record(3, &[KeyValue::new("key1", "value1")]);
2503 histogram.record(4, &[KeyValue::new("key1", "value1")]);
2504 histogram.record(5, &[KeyValue::new("key1", "value1")]);
2505
2506 test_context.flush_metrics();
2507
2508 let MetricData::Histogram(histogram_data) =
2510 test_context.get_aggregation::<u64>("test_histogram", None)
2511 else {
2512 unreachable!()
2513 };
2514 assert_eq!(histogram_data.data_points.len(), 1);
2516 if let Temporality::Cumulative = temporality {
2517 assert_eq!(
2518 histogram_data.temporality,
2519 Temporality::Cumulative,
2520 "Should produce cumulative"
2521 );
2522 } else {
2523 assert_eq!(
2524 histogram_data.temporality,
2525 Temporality::Delta,
2526 "Should produce delta"
2527 );
2528 }
2529
2530 let data_point =
2532 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
2533 .expect("datapoint with key1=value1 expected");
2534
2535 assert_eq!(data_point.count, 5);
2536 assert_eq!(data_point.sum, 15);
2537 assert!(data_point.bounds.is_empty());
2538 assert!(data_point.bucket_counts.is_empty());
2539 }
2540
2541 fn gauge_aggregation_helper(temporality: Temporality) {
2542 let mut test_context = TestContext::new(temporality);
2544 let gauge = test_context.meter().i64_gauge("my_gauge").build();
2545
2546 gauge.record(1, &[KeyValue::new("key1", "value1")]);
2548 gauge.record(2, &[KeyValue::new("key1", "value1")]);
2549 gauge.record(1, &[KeyValue::new("key1", "value1")]);
2550 gauge.record(3, &[KeyValue::new("key1", "value1")]);
2551 gauge.record(4, &[KeyValue::new("key1", "value1")]);
2552
2553 gauge.record(11, &[KeyValue::new("key1", "value2")]);
2554 gauge.record(13, &[KeyValue::new("key1", "value2")]);
2555 gauge.record(6, &[KeyValue::new("key1", "value2")]);
2556
2557 test_context.flush_metrics();
2558
2559 let MetricData::Gauge(gauge_data_point) =
2561 test_context.get_aggregation::<i64>("my_gauge", None)
2562 else {
2563 unreachable!()
2564 };
2565 assert_eq!(gauge_data_point.data_points.len(), 2);
2567
2568 let data_point1 =
2570 find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value1")
2571 .expect("datapoint with key1=value1 expected");
2572 assert_eq!(data_point1.value, 4);
2573
2574 let data_point1 =
2575 find_gauge_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value2")
2576 .expect("datapoint with key1=value2 expected");
2577 assert_eq!(data_point1.value, 6);
2578
2579 test_context.reset_metrics();
2581 gauge.record(1, &[KeyValue::new("key1", "value1")]);
2582 gauge.record(2, &[KeyValue::new("key1", "value1")]);
2583 gauge.record(11, &[KeyValue::new("key1", "value1")]);
2584 gauge.record(3, &[KeyValue::new("key1", "value1")]);
2585 gauge.record(41, &[KeyValue::new("key1", "value1")]);
2586
2587 gauge.record(34, &[KeyValue::new("key1", "value2")]);
2588 gauge.record(12, &[KeyValue::new("key1", "value2")]);
2589 gauge.record(54, &[KeyValue::new("key1", "value2")]);
2590
2591 test_context.flush_metrics();
2592
2593 let MetricData::Gauge(gauge) = test_context.get_aggregation::<i64>("my_gauge", None) else {
2594 unreachable!()
2595 };
2596 assert_eq!(gauge.data_points.len(), 2);
2597 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2598 .expect("datapoint with key1=value1 expected");
2599 assert_eq!(data_point1.value, 41);
2600
2601 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value2")
2602 .expect("datapoint with key1=value2 expected");
2603 assert_eq!(data_point1.value, 54);
2604 }
2605
2606 fn observable_gauge_aggregation_helper(temporality: Temporality, use_empty_attributes: bool) {
2607 let mut test_context = TestContext::new(temporality);
2609 let _observable_gauge = test_context
2610 .meter()
2611 .i64_observable_gauge("test_observable_gauge")
2612 .with_callback(move |observer| {
2613 if use_empty_attributes {
2614 observer.observe(1, &[]);
2615 }
2616 observer.observe(4, &[KeyValue::new("key1", "value1")]);
2617 observer.observe(5, &[KeyValue::new("key2", "value2")]);
2618 })
2619 .build();
2620
2621 test_context.flush_metrics();
2622
2623 let MetricData::Gauge(gauge) =
2625 test_context.get_aggregation::<i64>("test_observable_gauge", None)
2626 else {
2627 unreachable!()
2628 };
2629 let expected_time_series_count = if use_empty_attributes { 3 } else { 2 };
2631 assert_eq!(gauge.data_points.len(), expected_time_series_count);
2632
2633 if use_empty_attributes {
2634 let zero_attribute_datapoint =
2636 find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2637 .expect("datapoint with no attributes expected");
2638 assert_eq!(zero_attribute_datapoint.value, 1);
2639 }
2640
2641 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2643 .expect("datapoint with key1=value1 expected");
2644 assert_eq!(data_point1.value, 4);
2645
2646 let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2648 .expect("datapoint with key2=value2 expected");
2649 assert_eq!(data_point2.value, 5);
2650
2651 test_context.reset_metrics();
2653
2654 test_context.flush_metrics();
2655
2656 let MetricData::Gauge(gauge) =
2657 test_context.get_aggregation::<i64>("test_observable_gauge", None)
2658 else {
2659 unreachable!()
2660 };
2661 assert_eq!(gauge.data_points.len(), expected_time_series_count);
2662
2663 if use_empty_attributes {
2664 let zero_attribute_datapoint =
2665 find_gauge_datapoint_with_no_attributes(&gauge.data_points)
2666 .expect("datapoint with no attributes expected");
2667 assert_eq!(zero_attribute_datapoint.value, 1);
2668 }
2669
2670 let data_point1 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key1", "value1")
2671 .expect("datapoint with key1=value1 expected");
2672 assert_eq!(data_point1.value, 4);
2673
2674 let data_point2 = find_gauge_datapoint_with_key_value(&gauge.data_points, "key2", "value2")
2675 .expect("datapoint with key2=value2 expected");
2676 assert_eq!(data_point2.value, 5);
2677 }
2678
2679 fn counter_aggregation_helper(temporality: Temporality) {
2680 let mut test_context = TestContext::new(temporality);
2682 let counter = test_context.u64_counter("test", "my_counter", None);
2683
2684 counter.add(1, &[KeyValue::new("key1", "value1")]);
2686 counter.add(1, &[KeyValue::new("key1", "value1")]);
2687 counter.add(1, &[KeyValue::new("key1", "value1")]);
2688 counter.add(1, &[KeyValue::new("key1", "value1")]);
2689 counter.add(1, &[KeyValue::new("key1", "value1")]);
2690
2691 counter.add(1, &[KeyValue::new("key1", "value2")]);
2692 counter.add(1, &[KeyValue::new("key1", "value2")]);
2693 counter.add(1, &[KeyValue::new("key1", "value2")]);
2694
2695 test_context.flush_metrics();
2696
2697 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2699 unreachable!()
2700 };
2701 assert_eq!(sum.data_points.len(), 2);
2703 assert!(sum.is_monotonic, "Counter should produce monotonic.");
2704 if let Temporality::Cumulative = temporality {
2705 assert_eq!(
2706 sum.temporality,
2707 Temporality::Cumulative,
2708 "Should produce cumulative"
2709 );
2710 } else {
2711 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
2712 }
2713
2714 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2716 .expect("datapoint with key1=value1 expected");
2717 assert_eq!(data_point1.value, 5);
2718
2719 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2720 .expect("datapoint with key1=value2 expected");
2721 assert_eq!(data_point1.value, 3);
2722
2723 test_context.reset_metrics();
2725 counter.add(1, &[KeyValue::new("key1", "value1")]);
2726 counter.add(1, &[KeyValue::new("key1", "value1")]);
2727 counter.add(1, &[KeyValue::new("key1", "value1")]);
2728 counter.add(1, &[KeyValue::new("key1", "value1")]);
2729 counter.add(1, &[KeyValue::new("key1", "value1")]);
2730
2731 counter.add(1, &[KeyValue::new("key1", "value2")]);
2732 counter.add(1, &[KeyValue::new("key1", "value2")]);
2733 counter.add(1, &[KeyValue::new("key1", "value2")]);
2734
2735 test_context.flush_metrics();
2736
2737 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2738 unreachable!()
2739 };
2740 assert_eq!(sum.data_points.len(), 2);
2741 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
2742 .expect("datapoint with key1=value1 expected");
2743 if temporality == Temporality::Cumulative {
2744 assert_eq!(data_point1.value, 10);
2745 } else {
2746 assert_eq!(data_point1.value, 5);
2747 }
2748
2749 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
2750 .expect("datapoint with key1=value2 expected");
2751 if temporality == Temporality::Cumulative {
2752 assert_eq!(data_point1.value, 6);
2753 } else {
2754 assert_eq!(data_point1.value, 3);
2755 }
2756 }
2757
2758 fn counter_aggregation_overflow_helper(temporality: Temporality) {
2759 let mut test_context = TestContext::new(temporality);
2761 let counter = test_context.u64_counter("test", "my_counter", None);
2762
2763 for v in 0..2000 {
2766 counter.add(100, &[KeyValue::new("A", v.to_string())]);
2767 }
2768
2769 counter.add(3, &[]);
2771 counter.add(3, &[]);
2772
2773 counter.add(100, &[KeyValue::new("A", "foo")]);
2775 counter.add(100, &[KeyValue::new("A", "another")]);
2776 counter.add(100, &[KeyValue::new("A", "yet_another")]);
2777 test_context.flush_metrics();
2778
2779 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2780 unreachable!()
2781 };
2782
2783 assert_eq!(sum.data_points.len(), 2002);
2785
2786 let data_point =
2787 find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2788 assert_eq!(data_point.value, 300);
2789
2790 let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2792 .expect("Empty attributes point expected");
2793 assert!(
2794 empty_attrs_data_point.attributes.is_empty(),
2795 "Non-empty attribute set"
2796 );
2797 assert_eq!(
2798 empty_attrs_data_point.value, 6,
2799 "Empty attributes value should be 3+3=6"
2800 );
2801
2802 test_context.reset_metrics();
2805 counter.add(100, &[KeyValue::new("A", "foo")]);
2808 counter.add(100, &[KeyValue::new("A", "another")]);
2809 counter.add(100, &[KeyValue::new("A", "yet_another")]);
2810 test_context.flush_metrics();
2811
2812 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2813 unreachable!()
2814 };
2815
2816 if temporality == Temporality::Delta {
2817 assert_eq!(sum.data_points.len(), 3);
2818
2819 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo")
2820 .expect("point expected");
2821 assert_eq!(data_point.value, 100);
2822
2823 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another")
2824 .expect("point expected");
2825 assert_eq!(data_point.value, 100);
2826
2827 let data_point =
2828 find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another")
2829 .expect("point expected");
2830 assert_eq!(data_point.value, 100);
2831 } else {
2832 assert_eq!(sum.data_points.len(), 2002);
2834 let data_point =
2835 find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2836 assert_eq!(data_point.value, 600);
2837
2838 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo");
2839 assert!(data_point.is_none(), "point should not be present");
2840
2841 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another");
2842 assert!(data_point.is_none(), "point should not be present");
2843
2844 let data_point =
2845 find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another");
2846 assert!(data_point.is_none(), "point should not be present");
2847 }
2848 }
2849
2850 fn counter_aggregation_overflow_helper_custom_limit(temporality: Temporality) {
2851 let cardinality_limit = 2300;
2853 let view_change_cardinality = move |i: &Instrument| {
2854 if i.name == "my_counter" {
2855 Some(
2856 Stream::builder()
2857 .with_name("my_counter")
2858 .with_cardinality_limit(cardinality_limit)
2859 .build()
2860 .unwrap(),
2861 )
2862 } else {
2863 None
2864 }
2865 };
2866 let mut test_context = TestContext::new_with_view(temporality, view_change_cardinality);
2867 let counter = test_context.u64_counter("test", "my_counter", None);
2868
2869 for v in 0..cardinality_limit {
2872 counter.add(100, &[KeyValue::new("A", v.to_string())]);
2873 }
2874
2875 counter.add(3, &[]);
2877 counter.add(3, &[]);
2878
2879 counter.add(100, &[KeyValue::new("A", "foo")]);
2881 counter.add(100, &[KeyValue::new("A", "another")]);
2882 counter.add(100, &[KeyValue::new("A", "yet_another")]);
2883 test_context.flush_metrics();
2884
2885 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2886 unreachable!()
2887 };
2888
2889 assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
2891
2892 let data_point =
2893 find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2894 assert_eq!(data_point.value, 300);
2895
2896 let empty_attrs_data_point = find_sum_datapoint_with_no_attributes(&sum.data_points)
2898 .expect("Empty attributes point expected");
2899 assert!(
2900 empty_attrs_data_point.attributes.is_empty(),
2901 "Non-empty attribute set"
2902 );
2903 assert_eq!(
2904 empty_attrs_data_point.value, 6,
2905 "Empty attributes value should be 3+3=6"
2906 );
2907
2908 test_context.reset_metrics();
2911 counter.add(100, &[KeyValue::new("A", "foo")]);
2914 counter.add(100, &[KeyValue::new("A", "another")]);
2915 counter.add(100, &[KeyValue::new("A", "yet_another")]);
2916 test_context.flush_metrics();
2917
2918 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
2919 unreachable!()
2920 };
2921
2922 if temporality == Temporality::Delta {
2923 assert_eq!(sum.data_points.len(), 3);
2924
2925 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo")
2926 .expect("point expected");
2927 assert_eq!(data_point.value, 100);
2928
2929 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another")
2930 .expect("point expected");
2931 assert_eq!(data_point.value, 100);
2932
2933 let data_point =
2934 find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another")
2935 .expect("point expected");
2936 assert_eq!(data_point.value, 100);
2937 } else {
2938 assert_eq!(sum.data_points.len(), cardinality_limit + 1 + 1);
2940 let data_point =
2941 find_overflow_sum_datapoint(&sum.data_points).expect("overflow point expected");
2942 assert_eq!(data_point.value, 600);
2943
2944 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "foo");
2945 assert!(data_point.is_none(), "point should not be present");
2946
2947 let data_point = find_sum_datapoint_with_key_value(&sum.data_points, "A", "another");
2948 assert!(data_point.is_none(), "point should not be present");
2949
2950 let data_point =
2951 find_sum_datapoint_with_key_value(&sum.data_points, "A", "yet_another");
2952 assert!(data_point.is_none(), "point should not be present");
2953 }
2954 }
2955
2956 fn counter_aggregation_attribute_order_helper(temporality: Temporality, start_sorted: bool) {
2957 let mut test_context = TestContext::new(temporality);
2959 let counter = test_context.u64_counter("test", "my_counter", None);
2960
2961 if start_sorted {
2966 counter.add(
2967 1,
2968 &[
2969 KeyValue::new("A", "a"),
2970 KeyValue::new("B", "b"),
2971 KeyValue::new("C", "c"),
2972 ],
2973 );
2974 } else {
2975 counter.add(
2976 1,
2977 &[
2978 KeyValue::new("A", "a"),
2979 KeyValue::new("C", "c"),
2980 KeyValue::new("B", "b"),
2981 ],
2982 );
2983 }
2984
2985 counter.add(
2986 1,
2987 &[
2988 KeyValue::new("A", "a"),
2989 KeyValue::new("C", "c"),
2990 KeyValue::new("B", "b"),
2991 ],
2992 );
2993 counter.add(
2994 1,
2995 &[
2996 KeyValue::new("B", "b"),
2997 KeyValue::new("A", "a"),
2998 KeyValue::new("C", "c"),
2999 ],
3000 );
3001 counter.add(
3002 1,
3003 &[
3004 KeyValue::new("B", "b"),
3005 KeyValue::new("C", "c"),
3006 KeyValue::new("A", "a"),
3007 ],
3008 );
3009 counter.add(
3010 1,
3011 &[
3012 KeyValue::new("C", "c"),
3013 KeyValue::new("B", "b"),
3014 KeyValue::new("A", "a"),
3015 ],
3016 );
3017 counter.add(
3018 1,
3019 &[
3020 KeyValue::new("C", "c"),
3021 KeyValue::new("A", "a"),
3022 KeyValue::new("B", "b"),
3023 ],
3024 );
3025 test_context.flush_metrics();
3026
3027 let MetricData::Sum(sum) = test_context.get_aggregation::<u64>("my_counter", None) else {
3028 unreachable!()
3029 };
3030
3031 assert_eq!(sum.data_points.len(), 1);
3033
3034 let data_point1 = &sum.data_points[0];
3036 assert_eq!(data_point1.value, 6);
3037 }
3038
3039 fn updown_counter_aggregation_helper(temporality: Temporality) {
3040 let mut test_context = TestContext::new(temporality);
3042 let counter = test_context.i64_up_down_counter("test", "my_updown_counter", None);
3043
3044 counter.add(10, &[KeyValue::new("key1", "value1")]);
3046 counter.add(-1, &[KeyValue::new("key1", "value1")]);
3047 counter.add(-5, &[KeyValue::new("key1", "value1")]);
3048 counter.add(0, &[KeyValue::new("key1", "value1")]);
3049 counter.add(1, &[KeyValue::new("key1", "value1")]);
3050
3051 counter.add(10, &[KeyValue::new("key1", "value2")]);
3052 counter.add(0, &[KeyValue::new("key1", "value2")]);
3053 counter.add(-3, &[KeyValue::new("key1", "value2")]);
3054
3055 test_context.flush_metrics();
3056
3057 let MetricData::Sum(sum) = test_context.get_aggregation::<i64>("my_updown_counter", None)
3059 else {
3060 unreachable!()
3061 };
3062 assert_eq!(sum.data_points.len(), 2);
3064 assert!(
3065 !sum.is_monotonic,
3066 "UpDownCounter should produce non-monotonic."
3067 );
3068 assert_eq!(
3069 sum.temporality,
3070 Temporality::Cumulative,
3071 "Should produce Cumulative for UpDownCounter"
3072 );
3073
3074 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
3076 .expect("datapoint with key1=value1 expected");
3077 assert_eq!(data_point1.value, 5);
3078
3079 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
3080 .expect("datapoint with key1=value2 expected");
3081 assert_eq!(data_point1.value, 7);
3082
3083 test_context.reset_metrics();
3085 counter.add(10, &[KeyValue::new("key1", "value1")]);
3086 counter.add(-1, &[KeyValue::new("key1", "value1")]);
3087 counter.add(-5, &[KeyValue::new("key1", "value1")]);
3088 counter.add(0, &[KeyValue::new("key1", "value1")]);
3089 counter.add(1, &[KeyValue::new("key1", "value1")]);
3090
3091 counter.add(10, &[KeyValue::new("key1", "value2")]);
3092 counter.add(0, &[KeyValue::new("key1", "value2")]);
3093 counter.add(-3, &[KeyValue::new("key1", "value2")]);
3094
3095 test_context.flush_metrics();
3096
3097 let MetricData::Sum(sum) = test_context.get_aggregation::<i64>("my_updown_counter", None)
3098 else {
3099 unreachable!()
3100 };
3101 assert_eq!(sum.data_points.len(), 2);
3102 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value1")
3103 .expect("datapoint with key1=value1 expected");
3104 assert_eq!(data_point1.value, 10);
3105
3106 let data_point1 = find_sum_datapoint_with_key_value(&sum.data_points, "key1", "value2")
3107 .expect("datapoint with key1=value2 expected");
3108 assert_eq!(data_point1.value, 14);
3109 }
3110
3111 fn find_sum_datapoint_with_key_value<'a, T>(
3112 data_points: &'a [SumDataPoint<T>],
3113 key: &str,
3114 value: &str,
3115 ) -> Option<&'a SumDataPoint<T>> {
3116 data_points.iter().find(|&datapoint| {
3117 datapoint
3118 .attributes
3119 .iter()
3120 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3121 })
3122 }
3123
3124 fn find_overflow_sum_datapoint<T>(data_points: &[SumDataPoint<T>]) -> Option<&SumDataPoint<T>> {
3125 data_points.iter().find(|&datapoint| {
3126 datapoint.attributes.iter().any(|kv| {
3127 kv.key.as_str() == "otel.metric.overflow" && kv.value == Value::Bool(true)
3128 })
3129 })
3130 }
3131
3132 fn find_gauge_datapoint_with_key_value<'a, T>(
3133 data_points: &'a [GaugeDataPoint<T>],
3134 key: &str,
3135 value: &str,
3136 ) -> Option<&'a GaugeDataPoint<T>> {
3137 data_points.iter().find(|&datapoint| {
3138 datapoint
3139 .attributes
3140 .iter()
3141 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3142 })
3143 }
3144
3145 fn find_sum_datapoint_with_no_attributes<T>(
3146 data_points: &[SumDataPoint<T>],
3147 ) -> Option<&SumDataPoint<T>> {
3148 data_points
3149 .iter()
3150 .find(|&datapoint| datapoint.attributes.is_empty())
3151 }
3152
3153 fn find_gauge_datapoint_with_no_attributes<T>(
3154 data_points: &[GaugeDataPoint<T>],
3155 ) -> Option<&GaugeDataPoint<T>> {
3156 data_points
3157 .iter()
3158 .find(|&datapoint| datapoint.attributes.is_empty())
3159 }
3160
3161 fn find_histogram_datapoint_with_key_value<'a, T>(
3162 data_points: &'a [HistogramDataPoint<T>],
3163 key: &str,
3164 value: &str,
3165 ) -> Option<&'a HistogramDataPoint<T>> {
3166 data_points.iter().find(|&datapoint| {
3167 datapoint
3168 .attributes
3169 .iter()
3170 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
3171 })
3172 }
3173
3174 fn find_histogram_datapoint_with_no_attributes<T>(
3175 data_points: &[HistogramDataPoint<T>],
3176 ) -> Option<&HistogramDataPoint<T>> {
3177 data_points
3178 .iter()
3179 .find(|&datapoint| datapoint.attributes.is_empty())
3180 }
3181
3182 fn find_scope_metric<'a>(
3183 metrics: &'a [ScopeMetrics],
3184 name: &'a str,
3185 ) -> Option<&'a ScopeMetrics> {
3186 metrics
3187 .iter()
3188 .find(|&scope_metric| scope_metric.scope.name() == name)
3189 }
3190
3191 struct TestContext {
3192 exporter: InMemoryMetricExporter,
3193 meter_provider: SdkMeterProvider,
3194
3195 resource_metrics: Vec<ResourceMetrics>,
3197 }
3198
3199 impl TestContext {
3200 fn new(temporality: Temporality) -> Self {
3201 let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality);
3202 let exporter = exporter.build();
3203 let meter_provider = SdkMeterProvider::builder()
3204 .with_periodic_exporter(exporter.clone())
3205 .build();
3206
3207 TestContext {
3208 exporter,
3209 meter_provider,
3210 resource_metrics: vec![],
3211 }
3212 }
3213
3214 fn new_with_view<T>(temporality: Temporality, view: T) -> Self
3215 where
3216 T: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
3217 {
3218 let exporter = InMemoryMetricExporterBuilder::new().with_temporality(temporality);
3219 let exporter = exporter.build();
3220 let meter_provider = SdkMeterProvider::builder()
3221 .with_periodic_exporter(exporter.clone())
3222 .with_view(view)
3223 .build();
3224
3225 TestContext {
3226 exporter,
3227 meter_provider,
3228 resource_metrics: vec![],
3229 }
3230 }
3231
3232 fn u64_counter(
3233 &self,
3234 meter_name: &'static str,
3235 counter_name: &'static str,
3236 unit: Option<&'static str>,
3237 ) -> Counter<u64> {
3238 let meter = self.meter_provider.meter(meter_name);
3239 let mut counter_builder = meter.u64_counter(counter_name);
3240 if let Some(unit_name) = unit {
3241 counter_builder = counter_builder.with_unit(unit_name);
3242 }
3243 counter_builder.build()
3244 }
3245
3246 fn i64_up_down_counter(
3247 &self,
3248 meter_name: &'static str,
3249 counter_name: &'static str,
3250 unit: Option<&'static str>,
3251 ) -> UpDownCounter<i64> {
3252 let meter = self.meter_provider.meter(meter_name);
3253 let mut updown_counter_builder = meter.i64_up_down_counter(counter_name);
3254 if let Some(unit_name) = unit {
3255 updown_counter_builder = updown_counter_builder.with_unit(unit_name);
3256 }
3257 updown_counter_builder.build()
3258 }
3259
3260 fn meter(&self) -> Meter {
3261 self.meter_provider.meter("test")
3262 }
3263
3264 fn flush_metrics(&self) {
3265 self.meter_provider.force_flush().unwrap();
3266 }
3267
3268 fn reset_metrics(&self) {
3269 self.exporter.reset();
3270 }
3271
3272 fn check_no_metrics(&self) {
3273 let resource_metrics = self
3274 .exporter
3275 .get_finished_metrics()
3276 .expect("metrics expected to be exported"); assert!(resource_metrics.is_empty(), "no metrics should be exported");
3279 }
3280
3281 fn get_aggregation<T: Number>(
3282 &mut self,
3283 counter_name: &str,
3284 unit_name: Option<&str>,
3285 ) -> &MetricData<T> {
3286 self.resource_metrics = self
3287 .exporter
3288 .get_finished_metrics()
3289 .expect("metrics expected to be exported");
3290
3291 assert!(
3292 !self.resource_metrics.is_empty(),
3293 "no metrics were exported"
3294 );
3295
3296 assert!(
3297 self.resource_metrics.len() == 1,
3298 "Expected single resource metrics."
3299 );
3300 let resource_metric = self
3301 .resource_metrics
3302 .first()
3303 .expect("This should contain exactly one resource metric, as validated above.");
3304
3305 assert!(
3306 !resource_metric.scope_metrics.is_empty(),
3307 "No scope metrics in latest export"
3308 );
3309 assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
3310
3311 let metric = &resource_metric.scope_metrics[0].metrics[0];
3312 assert_eq!(metric.name, counter_name);
3313 if let Some(expected_unit) = unit_name {
3314 assert_eq!(metric.unit, expected_unit);
3315 }
3316
3317 T::extract_metrics_data_ref(&metric.data)
3318 .expect("Failed to cast aggregation to expected type")
3319 }
3320
3321 fn get_from_multiple_aggregations<T: Number>(
3322 &mut self,
3323 counter_name: &str,
3324 unit_name: Option<&str>,
3325 invocation_count: usize,
3326 ) -> Vec<&MetricData<T>> {
3327 self.resource_metrics = self
3328 .exporter
3329 .get_finished_metrics()
3330 .expect("metrics expected to be exported");
3331
3332 assert!(
3333 !self.resource_metrics.is_empty(),
3334 "no metrics were exported"
3335 );
3336
3337 assert_eq!(
3338 self.resource_metrics.len(),
3339 invocation_count,
3340 "Expected collect to be called {} times",
3341 invocation_count
3342 );
3343
3344 let result = self
3345 .resource_metrics
3346 .iter()
3347 .map(|resource_metric| {
3348 assert!(
3349 !resource_metric.scope_metrics.is_empty(),
3350 "An export with no scope metrics occurred"
3351 );
3352
3353 assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
3354
3355 let metric = &resource_metric.scope_metrics[0].metrics[0];
3356 assert_eq!(metric.name, counter_name);
3357
3358 if let Some(expected_unit) = unit_name {
3359 assert_eq!(metric.unit, expected_unit);
3360 }
3361
3362 let aggregation = T::extract_metrics_data_ref(&metric.data)
3363 .expect("Failed to cast aggregation to expected type");
3364 aggregation
3365 })
3366 .collect::<Vec<_>>();
3367
3368 result
3369 }
3370 }
3371}