opentelemetry_sdk/metrics/
in_memory_exporter.rs1use crate::error::{OTelSdkError, OTelSdkResult};
2use crate::metrics::data::{
3 ExponentialHistogram, Gauge, Histogram, MetricData, ResourceMetrics, Sum,
4};
5use crate::metrics::exporter::PushMetricExporter;
6use crate::metrics::Temporality;
7use crate::InMemoryExporterError;
8use std::collections::VecDeque;
9use std::fmt;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12
13use super::data::{AggregatedMetrics, Metric, ScopeMetrics};
14
15pub struct InMemoryMetricExporter {
63 metrics: Arc<Mutex<VecDeque<ResourceMetrics>>>,
64 temporality: Temporality,
65}
66
67impl Clone for InMemoryMetricExporter {
68 fn clone(&self) -> Self {
69 InMemoryMetricExporter {
70 metrics: self.metrics.clone(),
71 temporality: self.temporality,
72 }
73 }
74}
75
76impl fmt::Debug for InMemoryMetricExporter {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 f.debug_struct("InMemoryMetricExporter").finish()
79 }
80}
81
82impl Default for InMemoryMetricExporter {
83 fn default() -> Self {
84 InMemoryMetricExporterBuilder::new().build()
85 }
86}
87
88pub struct InMemoryMetricExporterBuilder {
97 temporality: Option<Temporality>,
98}
99
100impl fmt::Debug for InMemoryMetricExporterBuilder {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 f.debug_struct("InMemoryMetricExporterBuilder").finish()
103 }
104}
105
106impl Default for InMemoryMetricExporterBuilder {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl InMemoryMetricExporterBuilder {
113 pub fn new() -> Self {
115 Self { temporality: None }
116 }
117
118 pub fn with_temporality(mut self, temporality: Temporality) -> Self {
120 self.temporality = Some(temporality);
121 self
122 }
123
124 pub fn build(self) -> InMemoryMetricExporter {
127 InMemoryMetricExporter {
128 metrics: Arc::new(Mutex::new(VecDeque::new())),
129 temporality: self.temporality.unwrap_or_default(),
130 }
131 }
132}
133
134impl InMemoryMetricExporter {
135 pub fn get_finished_metrics(&self) -> Result<Vec<ResourceMetrics>, InMemoryExporterError> {
150 let metrics = self
151 .metrics
152 .lock()
153 .map(|metrics_guard| metrics_guard.iter().map(Self::clone_metrics).collect())
154 .map_err(InMemoryExporterError::from)?;
155 Ok(metrics)
156 }
157
158 pub fn reset(&self) {
169 let _ = self
170 .metrics
171 .lock()
172 .map(|mut metrics_guard| metrics_guard.clear());
173 }
174
175 fn clone_metrics(metric: &ResourceMetrics) -> ResourceMetrics {
176 ResourceMetrics {
177 resource: metric.resource.clone(),
178 scope_metrics: metric
179 .scope_metrics
180 .iter()
181 .map(|scope_metric| ScopeMetrics {
182 scope: scope_metric.scope.clone(),
183 metrics: scope_metric
184 .metrics
185 .iter()
186 .map(|metric| Metric {
187 name: metric.name.clone(),
188 description: metric.description.clone(),
189 unit: metric.unit.clone(),
190 data: Self::clone_data(&metric.data),
191 })
192 .collect(),
193 })
194 .collect(),
195 }
196 }
197
198 fn clone_data(data: &AggregatedMetrics) -> AggregatedMetrics {
199 fn clone_inner<T: Clone>(data: &MetricData<T>) -> MetricData<T> {
200 match data {
201 MetricData::Gauge(gauge) => Gauge {
202 data_points: gauge.data_points.clone(),
203 start_time: gauge.start_time,
204 time: gauge.time,
205 }
206 .into(),
207 MetricData::Sum(sum) => Sum {
208 data_points: sum.data_points.clone(),
209 start_time: sum.start_time,
210 time: sum.time,
211 temporality: sum.temporality,
212 is_monotonic: sum.is_monotonic,
213 }
214 .into(),
215 MetricData::Histogram(histogram) => Histogram {
216 data_points: histogram.data_points.clone(),
217 start_time: histogram.start_time,
218 time: histogram.time,
219 temporality: histogram.temporality,
220 }
221 .into(),
222 MetricData::ExponentialHistogram(exponential_histogram) => ExponentialHistogram {
223 data_points: exponential_histogram.data_points.clone(),
224 start_time: exponential_histogram.start_time,
225 time: exponential_histogram.time,
226 temporality: exponential_histogram.temporality,
227 }
228 .into(),
229 }
230 }
231 match data {
232 AggregatedMetrics::F64(metric_data) => AggregatedMetrics::F64(clone_inner(metric_data)),
233 AggregatedMetrics::U64(metric_data) => AggregatedMetrics::U64(clone_inner(metric_data)),
234 AggregatedMetrics::I64(metric_data) => AggregatedMetrics::I64(clone_inner(metric_data)),
235 }
236 }
237}
238
239impl PushMetricExporter for InMemoryMetricExporter {
240 async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
241 self.metrics
242 .lock()
243 .map(|mut metrics_guard| {
244 metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics))
245 })
246 .map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string()))
247 }
248
249 fn force_flush(&self) -> OTelSdkResult {
250 Ok(()) }
252
253 fn shutdown(&self) -> OTelSdkResult {
254 Ok(())
255 }
256
257 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
258 Ok(())
259 }
260
261 fn temporality(&self) -> Temporality {
262 self.temporality
263 }
264}