opentelemetry_sdk/metrics/
in_memory_exporter.rs

1use crate::error::{OTelSdkError, OTelSdkResult};
2use crate::metrics::data::{
3    ExponentialHistogram, Gauge, Histogram, MetricData, ResourceMetrics, Sum,
4};
5use crate::metrics::exporter::PushMetricExporter;
6use crate::metrics::Temporality;
7use crate::InMemoryExporterError;
8use std::collections::VecDeque;
9use std::fmt;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13use super::data::{AggregatedMetrics, Metric, ScopeMetrics};
14
15/// An in-memory metrics exporter that stores metrics data in memory.
16///
17/// This exporter is useful for testing and debugging purposes. It stores
18/// metric data in a `VecDeque<ResourceMetrics>`. Metrics can be retrieved
19/// using the `get_finished_metrics` method.
20///
21/// # Panics
22///
23/// This exporter may panic
24/// - if there's an issue with locking the `metrics` Mutex, such as if the Mutex is poisoned.
25/// - the data point recorded is not one of [i64, u64, f64]. This shouldn't happen if used with OpenTelemetry API.
26///
27/// # Example
28///
29/// ```
30///# use opentelemetry_sdk::metrics;
31///# use opentelemetry::{KeyValue};
32///# use opentelemetry::metrics::MeterProvider;
33///# use opentelemetry_sdk::metrics::InMemoryMetricExporter;
34///# use opentelemetry_sdk::metrics::PeriodicReader;
35///
36///# #[tokio::main]
37///# async fn main() {
38/// // Create an InMemoryMetricExporter
39///  let exporter = InMemoryMetricExporter::default();
40///
41///  // Create a MeterProvider and register the exporter
42///  let meter_provider = metrics::SdkMeterProvider::builder()
43///      .with_reader(PeriodicReader::builder(exporter.clone()).build())
44///      .build();
45///
46///  // Create and record metrics using the MeterProvider
47///  let meter = meter_provider.meter("example");
48///  let counter = meter.u64_counter("my_counter").build();
49///  counter.add(1, &[KeyValue::new("key", "value")]);
50///
51///  meter_provider.force_flush().unwrap();
52///
53///  // Retrieve the finished metrics from the exporter
54///  let finished_metrics = exporter.get_finished_metrics().unwrap();
55///
56///  // Print the finished metrics
57/// for resource_metrics in finished_metrics {
58///      println!("{:?}", resource_metrics);
59///  }
60///# }
61/// ```
62pub struct InMemoryMetricExporter {
63    metrics: Arc<Mutex<VecDeque<ResourceMetrics>>>,
64    temporality: Temporality,
65}
66
67impl Clone for InMemoryMetricExporter {
68    fn clone(&self) -> Self {
69        InMemoryMetricExporter {
70            metrics: self.metrics.clone(),
71            temporality: self.temporality,
72        }
73    }
74}
75
76impl fmt::Debug for InMemoryMetricExporter {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        f.debug_struct("InMemoryMetricExporter").finish()
79    }
80}
81
82impl Default for InMemoryMetricExporter {
83    fn default() -> Self {
84        InMemoryMetricExporterBuilder::new().build()
85    }
86}
87
88/// Builder for [`InMemoryMetricExporter`].
89/// # Example
90///
91/// ```
92/// # use opentelemetry_sdk::metrics::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};
93///
94/// let exporter = InMemoryMetricExporterBuilder::new().build();
95/// ```
96pub struct InMemoryMetricExporterBuilder {
97    temporality: Option<Temporality>,
98}
99
100impl fmt::Debug for InMemoryMetricExporterBuilder {
101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102        f.debug_struct("InMemoryMetricExporterBuilder").finish()
103    }
104}
105
106impl Default for InMemoryMetricExporterBuilder {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl InMemoryMetricExporterBuilder {
113    /// Creates a new instance of the `InMemoryMetricExporterBuilder`.
114    pub fn new() -> Self {
115        Self { temporality: None }
116    }
117
118    /// Set the [Temporality] of the exporter.
119    pub fn with_temporality(mut self, temporality: Temporality) -> Self {
120        self.temporality = Some(temporality);
121        self
122    }
123
124    /// Creates a new instance of the `InMemoryMetricExporter`.
125    ///
126    pub fn build(self) -> InMemoryMetricExporter {
127        InMemoryMetricExporter {
128            metrics: Arc::new(Mutex::new(VecDeque::new())),
129            temporality: self.temporality.unwrap_or_default(),
130        }
131    }
132}
133
134impl InMemoryMetricExporter {
135    /// Returns the finished metrics as a vector of `ResourceMetrics`.
136    ///
137    /// # Errors
138    ///
139    /// Returns a `MetricError` if the internal lock cannot be acquired.
140    ///
141    /// # Example
142    ///
143    /// ```
144    /// # use opentelemetry_sdk::metrics::InMemoryMetricExporter;
145    ///
146    /// let exporter = InMemoryMetricExporter::default();
147    /// let finished_metrics = exporter.get_finished_metrics().unwrap();
148    /// ```
149    pub fn get_finished_metrics(&self) -> Result<Vec<ResourceMetrics>, InMemoryExporterError> {
150        let metrics = self
151            .metrics
152            .lock()
153            .map(|metrics_guard| metrics_guard.iter().map(Self::clone_metrics).collect())
154            .map_err(InMemoryExporterError::from)?;
155        Ok(metrics)
156    }
157
158    /// Clears the internal storage of finished metrics.
159    ///
160    /// # Example
161    ///
162    /// ```
163    /// # use opentelemetry_sdk::metrics::InMemoryMetricExporter;
164    ///
165    /// let exporter = InMemoryMetricExporter::default();
166    /// exporter.reset();
167    /// ```
168    pub fn reset(&self) {
169        let _ = self
170            .metrics
171            .lock()
172            .map(|mut metrics_guard| metrics_guard.clear());
173    }
174
175    fn clone_metrics(metric: &ResourceMetrics) -> ResourceMetrics {
176        ResourceMetrics {
177            resource: metric.resource.clone(),
178            scope_metrics: metric
179                .scope_metrics
180                .iter()
181                .map(|scope_metric| ScopeMetrics {
182                    scope: scope_metric.scope.clone(),
183                    metrics: scope_metric
184                        .metrics
185                        .iter()
186                        .map(|metric| Metric {
187                            name: metric.name.clone(),
188                            description: metric.description.clone(),
189                            unit: metric.unit.clone(),
190                            data: Self::clone_data(&metric.data),
191                        })
192                        .collect(),
193                })
194                .collect(),
195        }
196    }
197
198    fn clone_data(data: &AggregatedMetrics) -> AggregatedMetrics {
199        fn clone_inner<T: Clone>(data: &MetricData<T>) -> MetricData<T> {
200            match data {
201                MetricData::Gauge(gauge) => Gauge {
202                    data_points: gauge.data_points.clone(),
203                    start_time: gauge.start_time,
204                    time: gauge.time,
205                }
206                .into(),
207                MetricData::Sum(sum) => Sum {
208                    data_points: sum.data_points.clone(),
209                    start_time: sum.start_time,
210                    time: sum.time,
211                    temporality: sum.temporality,
212                    is_monotonic: sum.is_monotonic,
213                }
214                .into(),
215                MetricData::Histogram(histogram) => Histogram {
216                    data_points: histogram.data_points.clone(),
217                    start_time: histogram.start_time,
218                    time: histogram.time,
219                    temporality: histogram.temporality,
220                }
221                .into(),
222                MetricData::ExponentialHistogram(exponential_histogram) => ExponentialHistogram {
223                    data_points: exponential_histogram.data_points.clone(),
224                    start_time: exponential_histogram.start_time,
225                    time: exponential_histogram.time,
226                    temporality: exponential_histogram.temporality,
227                }
228                .into(),
229            }
230        }
231        match data {
232            AggregatedMetrics::F64(metric_data) => AggregatedMetrics::F64(clone_inner(metric_data)),
233            AggregatedMetrics::U64(metric_data) => AggregatedMetrics::U64(clone_inner(metric_data)),
234            AggregatedMetrics::I64(metric_data) => AggregatedMetrics::I64(clone_inner(metric_data)),
235        }
236    }
237}
238
239impl PushMetricExporter for InMemoryMetricExporter {
240    async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
241        self.metrics
242            .lock()
243            .map(|mut metrics_guard| {
244                metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics))
245            })
246            .map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string()))
247    }
248
249    fn force_flush(&self) -> OTelSdkResult {
250        Ok(()) // In this implementation, flush does nothing
251    }
252
253    fn shutdown(&self) -> OTelSdkResult {
254        Ok(())
255    }
256
257    fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
258        Ok(())
259    }
260
261    fn temporality(&self) -> Temporality {
262        self.temporality
263    }
264}