opentelemetry_sdk/trace/
span_processor.rs

1//! # OpenTelemetry Span Processor Interface
2//!
3//! Span processor is an interface which allows hooks for span start and end method
4//! invocations. The span processors are invoked only when
5//! [`is_recording`] is true.
6//!
7//! Built-in span processors are responsible for batching and conversion of spans to
8//! exportable representation and passing batches to exporters.
9//!
10//! Span processors can be registered directly on SDK [`TracerProvider`] and they are
11//! invoked in the same order as they were registered.
12//!
13//! All `Tracer` instances created by a `TracerProvider` share the same span processors.
14//! Changes to this collection reflect in all `Tracer` instances.
15//!
16//! The following diagram shows `SpanProcessor`'s relationship to other components
17//! in the SDK:
18//!
19//! ```ascii
20//!   +-----+--------------+   +-----------------------+   +-------------------+
21//!   |     |              |   |                       |   |                   |
22//!   |     |              |   | (Batch)SpanProcessor  |   |    SpanExporter   |
23//!   |     |              +---> (Simple)SpanProcessor +--->  (OTLPExporter)   |
24//!   |     |              |   |                       |   |                   |
25//!   | SDK | Tracer.span()|   +-----------------------+   +-------------------+
26//!   |     | Span.end()   |
27//!   |     |              |
28//!   |     |              |
29//!   |     |              |
30//!   |     |              |
31//!   +-----+--------------+
32//! ```
33//!
34//! [`is_recording`]: opentelemetry::trace::Span::is_recording()
35//! [`TracerProvider`]: opentelemetry::trace::TracerProvider
36
37use crate::error::{OTelSdkError, OTelSdkResult};
38use crate::resource::Resource;
39use crate::trace::Span;
40use crate::trace::{SpanData, SpanExporter};
41use opentelemetry::Context;
42use opentelemetry::{otel_debug, otel_error, otel_warn};
43use std::cmp::min;
44use std::sync::atomic::{AtomicUsize, Ordering};
45use std::sync::{Arc, Mutex};
46use std::{env, str::FromStr, time::Duration};
47
48use std::sync::atomic::AtomicBool;
49use std::thread;
50use std::time::Instant;
51
52/// Delay interval between two consecutive exports.
53pub(crate) const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
54/// Default delay interval between two consecutive exports.
55pub(crate) const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(5_000);
56/// Maximum queue size
57pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
58/// Default maximum queue size
59pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
60/// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE
61pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
62/// Default maximum batch size
63pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
64/// Maximum allowed time to export data.
65pub(crate) const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
66/// Default maximum allowed time to export data.
67pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000);
68/// Environment variable to configure max concurrent exports for batch span
69/// processor.
70pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
71/// Default max concurrent exports for BSP
72pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
73
74/// `SpanProcessor` is an interface which allows hooks for span start and end
75/// method invocations. The span processors are invoked only when is_recording
76/// is true.
77pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
78    /// `on_start` is called when a `Span` is started.  This method is called
79    /// synchronously on the thread that started the span, therefore it should
80    /// not block or throw exceptions.
81    fn on_start(&self, span: &mut Span, cx: &Context);
82    /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
83    /// already set). This method is called synchronously within the `Span::end`
84    /// API, therefore it should not block or throw an exception.
85    /// TODO - This method should take reference to `SpanData`
86    fn on_end(&self, span: SpanData);
87    /// Force the spans lying in the cache to be exported.
88    fn force_flush(&self) -> OTelSdkResult;
89    /// Shuts down the processor. Called when SDK is shut down. This is an
90    /// opportunity for processors to do any cleanup required.
91    ///
92    /// Implementation should make sure shutdown can be called multiple times.
93    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;
94    /// shutdown the processor with a default timeout.
95    fn shutdown(&self) -> OTelSdkResult {
96        self.shutdown_with_timeout(Duration::from_secs(5))
97    }
98    /// Set the resource for the span processor.
99    fn set_resource(&mut self, _resource: &Resource) {}
100}
101
102/// A [SpanProcessor] that passes finished spans to the configured
103/// `SpanExporter`, as soon as they are finished, without any batching. This is
104/// typically useful for debugging and testing. For scenarios requiring higher
105/// performance/throughput, consider using [BatchSpanProcessor].
106/// Spans are exported synchronously
107/// in the same thread that emits the log record.
108/// When using this processor with the OTLP Exporter, the following exporter
109/// features are supported:
110/// - `grpc-tonic`: This requires TracerProvider to be created within a tokio
111///   runtime. Spans can be emitted from any thread, including tokio runtime
112///   threads.
113/// - `reqwest-blocking-client`: TracerProvider may be created anywhere, but
114///   spans must be emitted from a non-tokio runtime thread.
115/// - `reqwest-client`: TracerProvider may be created anywhere, but spans must be
116///   emitted from a tokio runtime thread.
117#[derive(Debug)]
118pub struct SimpleSpanProcessor<T: SpanExporter> {
119    exporter: Mutex<T>,
120}
121
122impl<T: SpanExporter> SimpleSpanProcessor<T> {
123    /// Create a new [SimpleSpanProcessor] using the provided exporter.
124    pub fn new(exporter: T) -> Self {
125        Self {
126            exporter: Mutex::new(exporter),
127        }
128    }
129}
130
131impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
132    fn on_start(&self, _span: &mut Span, _cx: &Context) {
133        // Ignored
134    }
135
136    fn on_end(&self, span: SpanData) {
137        if !span.span_context.is_sampled() {
138            return;
139        }
140
141        let result = self
142            .exporter
143            .lock()
144            .map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into()))
145            .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span])));
146
147        if let Err(err) = result {
148            // TODO: check error type, and log `error` only if the error is user-actionable, else log `debug`
149            otel_debug!(
150                name: "SimpleProcessor.OnEnd.Error",
151                reason = format!("{:?}", err)
152            );
153        }
154    }
155
156    fn force_flush(&self) -> OTelSdkResult {
157        // Nothing to flush for simple span processor.
158        Ok(())
159    }
160
161    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
162        if let Ok(mut exporter) = self.exporter.lock() {
163            exporter.shutdown_with_timeout(timeout)
164        } else {
165            Err(OTelSdkError::InternalFailure(
166                "SimpleSpanProcessor mutex poison at shutdown".into(),
167            ))
168        }
169    }
170
171    fn set_resource(&mut self, resource: &Resource) {
172        if let Ok(mut exporter) = self.exporter.lock() {
173            exporter.set_resource(resource);
174        }
175    }
176}
177
178/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
179/// in batches to the configured `SpanExporter`. This processor is ideal for
180/// high-throughput environments, as it minimizes the overhead of exporting spans
181/// individually. It uses a **dedicated background thread** to manage and export spans
182/// asynchronously, ensuring that the application's main execution flow is not blocked.
183///
184/// When using this processor with the OTLP Exporter, the following exporter
185/// features are supported:
186/// - `grpc-tonic`: This requires `TracerProvider` to be created within a tokio
187///   runtime.
188/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
189///
190/// In other words, other clients like `reqwest` and `hyper` are not supported.
191/// /// # Example
192///
193/// This example demonstrates how to configure and use the `BatchSpanProcessor`
194/// with a custom configuration. Note that a dedicated thread is used internally
195/// to manage the export process.
196///
197/// ```rust
198/// use opentelemetry::global;
199/// use opentelemetry_sdk::{
200///     trace::{BatchSpanProcessor, BatchConfigBuilder, SdkTracerProvider},
201///     runtime,
202///     testing::trace::NoopSpanExporter,
203/// };
204/// use opentelemetry::trace::Tracer as _;
205/// use opentelemetry::trace::Span;
206/// use std::time::Duration;
207///
208/// fn main() {
209///     // Step 1: Create an exporter (e.g., a No-Op Exporter for demonstration).
210///     let exporter = NoopSpanExporter::new();
211///
212///     // Step 2: Configure the BatchSpanProcessor.
213///     let batch_processor = BatchSpanProcessor::builder(exporter)
214///         .with_batch_config(
215///             BatchConfigBuilder::default()
216///                 .with_max_queue_size(1024) // Buffer up to 1024 spans.
217///                 .with_max_export_batch_size(256) // Export in batches of up to 256 spans.
218///                 .with_scheduled_delay(Duration::from_secs(5)) // Export every 5 seconds.
219///                 .build(),
220///         )
221///         .build();
222///
223///     // Step 3: Set up a TracerProvider with the configured processor.
224///     let provider = SdkTracerProvider::builder()
225///         .with_span_processor(batch_processor)
226///         .build();
227///     global::set_tracer_provider(provider.clone());
228///
229///     // Step 4: Create spans and record operations.
230///     let tracer = global::tracer("example-tracer");
231///     let mut span = tracer.start("example-span");
232///     span.end(); // Mark the span as completed.
233///
234///     // Step 5: Ensure all spans are flushed before exiting.
235///     provider.shutdown();
236/// }
237/// ```
238use std::sync::mpsc::sync_channel;
239use std::sync::mpsc::Receiver;
240use std::sync::mpsc::RecvTimeoutError;
241use std::sync::mpsc::SyncSender;
242
243/// Messages exchanged between the main thread and the background thread.
244#[allow(clippy::large_enum_variant)]
245#[derive(Debug)]
246enum BatchMessage {
247    //ExportSpan(SpanData),
248    ExportSpan(Arc<AtomicBool>),
249    ForceFlush(SyncSender<OTelSdkResult>),
250    Shutdown(SyncSender<OTelSdkResult>),
251    SetResource(Arc<Resource>),
252}
253
254/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
255/// in batches to the configured `SpanExporter`. This processor is ideal for
256/// high-throughput environments, as it minimizes the overhead of exporting spans
257/// individually. It uses a **dedicated background thread** to manage and export spans
258/// asynchronously, ensuring that the application's main execution flow is not blocked.
259///
260/// This processor supports the following configurations:
261/// - **Queue size**: Maximum number of spans that can be buffered.
262/// - **Batch size**: Maximum number of spans to include in a single export.
263/// - **Scheduled delay**: Frequency at which the batch is exported.
264///
265/// When using this processor with the OTLP Exporter, the following exporter
266/// features are supported:
267/// - `grpc-tonic`: Requires `TracerProvider` to be created within a tokio runtime.
268/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
269///
270/// In other words, other clients like `reqwest` and `hyper` are not supported.
271///
272/// `BatchSpanProcessor` buffers spans in memory and exports them in batches. An
273/// export is triggered when `max_export_batch_size` is reached or every
274/// `scheduled_delay` milliseconds. Users can explicitly trigger an export using
275/// the `force_flush` method. Shutdown also triggers an export of all buffered
276/// spans and is recommended to be called before the application exits to ensure
277/// all buffered spans are exported.
278///
279/// **Warning**: When using tokio's current-thread runtime, `shutdown()`, which
280/// is a blocking call ,should not be called from your main thread. This can
281/// cause deadlock. Instead, call `shutdown()` from a separate thread or use
282/// tokio's `spawn_blocking`.
283///
284#[derive(Debug)]
285pub struct BatchSpanProcessor {
286    span_sender: SyncSender<SpanData>, // Data channel to store spans
287    message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
288    handle: Mutex<Option<thread::JoinHandle<()>>>,
289    forceflush_timeout: Duration,
290    is_shutdown: AtomicBool,
291    dropped_span_count: Arc<AtomicUsize>,
292    export_span_message_sent: Arc<AtomicBool>,
293    current_batch_size: Arc<AtomicUsize>,
294    max_export_batch_size: usize,
295    max_queue_size: usize,
296}
297
298impl BatchSpanProcessor {
299    /// Creates a new instance of `BatchSpanProcessor`.
300    pub fn new<E>(
301        mut exporter: E,
302        config: BatchConfig,
303        //max_queue_size: usize,
304        //scheduled_delay: Duration,
305        //shutdown_timeout: Duration,
306    ) -> Self
307    where
308        E: SpanExporter + Send + 'static,
309    {
310        let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
311        let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
312        let max_queue_size = config.max_queue_size;
313        let max_export_batch_size = config.max_export_batch_size;
314        let current_batch_size = Arc::new(AtomicUsize::new(0));
315        let current_batch_size_for_thread = current_batch_size.clone();
316
317        let handle = thread::Builder::new()
318            .name("OpenTelemetry.Traces.BatchProcessor".to_string())
319            .spawn(move || {
320                let _suppress_guard = Context::enter_telemetry_suppressed_scope();
321                otel_debug!(
322                    name: "BatchSpanProcessor.ThreadStarted",
323                    interval_in_millisecs = config.scheduled_delay.as_millis(),
324                    max_export_batch_size = config.max_export_batch_size,
325                    max_queue_size = config.max_queue_size,
326                );
327                let mut spans = Vec::with_capacity(config.max_export_batch_size);
328                let mut last_export_time = Instant::now();
329                let current_batch_size = current_batch_size_for_thread;
330                loop {
331                    let remaining_time_option = config
332                        .scheduled_delay
333                        .checked_sub(last_export_time.elapsed());
334                    let remaining_time = match remaining_time_option {
335                        Some(remaining_time) => remaining_time,
336                        None => config.scheduled_delay,
337                    };
338                    match message_receiver.recv_timeout(remaining_time) {
339                        Ok(message) => match message {
340                            BatchMessage::ExportSpan(export_span_message_sent) => {
341                                // Reset the export span message sent flag now it has has been processed.
342                                export_span_message_sent.store(false, Ordering::Relaxed);
343                                otel_debug!(
344                                    name: "BatchSpanProcessor.ExportingDueToBatchSize",
345                                );
346                                let _ = Self::get_spans_and_export(
347                                    &span_receiver,
348                                    &mut exporter,
349                                    &mut spans,
350                                    &mut last_export_time,
351                                    &current_batch_size,
352                                    &config,
353                                );
354                            }
355                            BatchMessage::ForceFlush(sender) => {
356                                otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
357                                let result = Self::get_spans_and_export(
358                                    &span_receiver,
359                                    &mut exporter,
360                                    &mut spans,
361                                    &mut last_export_time,
362                                    &current_batch_size,
363                                    &config,
364                                );
365                                let _ = sender.send(result);
366                            }
367                            BatchMessage::Shutdown(sender) => {
368                                otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
369                                let result = Self::get_spans_and_export(
370                                    &span_receiver,
371                                    &mut exporter,
372                                    &mut spans,
373                                    &mut last_export_time,
374                                    &current_batch_size,
375                                    &config,
376                                );
377                                let _ = sender.send(result);
378
379                                otel_debug!(
380                                    name: "BatchSpanProcessor.ThreadExiting",
381                                    reason = "ShutdownRequested"
382                                );
383                                //
384                                // break out the loop and return from the current background thread.
385                                //
386                                break;
387                            }
388                            BatchMessage::SetResource(resource) => {
389                                exporter.set_resource(&resource);
390                            }
391                        },
392                        Err(RecvTimeoutError::Timeout) => {
393                            otel_debug!(
394                                name: "BatchSpanProcessor.ExportingDueToTimer",
395                            );
396
397                            let _ = Self::get_spans_and_export(
398                                &span_receiver,
399                                &mut exporter,
400                                &mut spans,
401                                &mut last_export_time,
402                                &current_batch_size,
403                                &config,
404                            );
405                        }
406                        Err(RecvTimeoutError::Disconnected) => {
407                            // Channel disconnected, only thing to do is break
408                            // out (i.e exit the thread)
409                            otel_debug!(
410                                name: "BatchSpanProcessor.ThreadExiting",
411                                reason = "MessageSenderDisconnected"
412                            );
413                            break;
414                        }
415                    }
416                }
417                otel_debug!(
418                    name: "BatchSpanProcessor.ThreadStopped"
419                );
420            })
421            .expect("Failed to spawn thread"); //TODO: Handle thread spawn failure
422
423        Self {
424            span_sender,
425            message_sender,
426            handle: Mutex::new(Some(handle)),
427            forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
428            is_shutdown: AtomicBool::new(false),
429            dropped_span_count: Arc::new(AtomicUsize::new(0)),
430            max_queue_size,
431            export_span_message_sent: Arc::new(AtomicBool::new(false)),
432            current_batch_size,
433            max_export_batch_size,
434        }
435    }
436
437    /// builder
438    pub fn builder<E>(exporter: E) -> BatchSpanProcessorBuilder<E>
439    where
440        E: SpanExporter + Send + 'static,
441    {
442        BatchSpanProcessorBuilder {
443            exporter,
444            config: BatchConfig::default(),
445        }
446    }
447
448    // This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
449    // It returns the result of the export operation.
450    // It expects the span vec to be empty when it's called.
451    #[inline]
452    fn get_spans_and_export<E>(
453        spans_receiver: &Receiver<SpanData>,
454        exporter: &mut E,
455        spans: &mut Vec<SpanData>,
456        last_export_time: &mut Instant,
457        current_batch_size: &AtomicUsize,
458        config: &BatchConfig,
459    ) -> OTelSdkResult
460    where
461        E: SpanExporter + Send + Sync + 'static,
462    {
463        // Get upto `max_export_batch_size` amount of spans from the channel and push them to the span vec
464        while let Ok(span) = spans_receiver.try_recv() {
465            spans.push(span);
466            if spans.len() == config.max_export_batch_size {
467                break;
468            }
469        }
470
471        let count_of_spans = spans.len(); // Count of spans that will be exported
472        let result = Self::export_batch_sync(exporter, spans, last_export_time); // This method clears the spans vec after exporting
473
474        current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
475        result
476    }
477
478    #[allow(clippy::vec_box)]
479    fn export_batch_sync<E>(
480        exporter: &mut E,
481        batch: &mut Vec<SpanData>,
482        last_export_time: &mut Instant,
483    ) -> OTelSdkResult
484    where
485        E: SpanExporter + Send + Sync + 'static,
486    {
487        *last_export_time = Instant::now();
488
489        if batch.is_empty() {
490            return OTelSdkResult::Ok(());
491        }
492
493        let export = exporter.export(batch.split_off(0));
494        let export_result = futures_executor::block_on(export);
495
496        match export_result {
497            Ok(_) => OTelSdkResult::Ok(()),
498            Err(err) => {
499                otel_error!(
500                    name: "BatchSpanProcessor.ExportError",
501                    error = format!("{}", err)
502                );
503                Err(OTelSdkError::InternalFailure(err.to_string()))
504            }
505        }
506    }
507}
508
509impl SpanProcessor for BatchSpanProcessor {
510    /// Handles span start.
511    fn on_start(&self, _span: &mut Span, _cx: &Context) {
512        // Ignored
513    }
514
515    /// Handles span end.
516    fn on_end(&self, span: SpanData) {
517        if self.is_shutdown.load(Ordering::Relaxed) {
518            // this is a warning, as the user is trying to emit after the processor has been shutdown
519            otel_warn!(
520                name: "BatchSpanProcessor.Emit.ProcessorShutdown",
521                message = "BatchSpanProcessor has been shutdown. No further spans will be emitted."
522            );
523            return;
524        }
525        let result = self.span_sender.try_send(span);
526
527        if result.is_err() {
528            // Increment dropped span count. The first time we have to drop a span,
529            // emit a warning.
530            if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
531                otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
532                    message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
533            }
534        }
535        // At this point, sending the span to the data channel was successful.
536        // Increment the current batch size and check if it has reached the max export batch size.
537        if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
538        {
539            // Check if the a control message for exporting spans is already sent to the worker thread.
540            // If not, send a control message to export spans.
541            // `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.
542
543            if !self.export_span_message_sent.load(Ordering::Relaxed) {
544                // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
545                // Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false.
546                // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
547                // We could have used compare_exchange as well here, but it's more verbose than swap.
548                if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
549                    match self.message_sender.try_send(BatchMessage::ExportSpan(
550                        self.export_span_message_sent.clone(),
551                    )) {
552                        Ok(_) => {
553                            // Control message sent successfully.
554                        }
555                        Err(_err) => {
556                            // TODO: Log error
557                            // If the control message could not be sent, reset the `export_span_message_sent` flag.
558                            self.export_span_message_sent
559                                .store(false, Ordering::Relaxed);
560                        }
561                    }
562                }
563            }
564        }
565    }
566
567    /// Flushes all pending spans.
568    fn force_flush(&self) -> OTelSdkResult {
569        if self.is_shutdown.load(Ordering::Relaxed) {
570            return Err(OTelSdkError::AlreadyShutdown);
571        }
572        let (sender, receiver) = sync_channel(1);
573        self.message_sender
574            .try_send(BatchMessage::ForceFlush(sender))
575            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
576
577        receiver
578            .recv_timeout(self.forceflush_timeout)
579            .map_err(|_| OTelSdkError::Timeout(self.forceflush_timeout))?
580    }
581
582    /// Shuts down the processor.
583    fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
584        if self.is_shutdown.swap(true, Ordering::Relaxed) {
585            return Err(OTelSdkError::AlreadyShutdown);
586        }
587        let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed);
588        let max_queue_size = self.max_queue_size;
589        if dropped_spans > 0 {
590            otel_warn!(
591                name: "BatchSpanProcessor.SpansDropped",
592                dropped_span_count = dropped_spans,
593                max_queue_size = max_queue_size,
594                message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
595            );
596        }
597
598        let (sender, receiver) = sync_channel(1);
599        self.message_sender
600            .try_send(BatchMessage::Shutdown(sender))
601            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
602
603        let result = receiver
604            .recv_timeout(timeout)
605            .map_err(|_| OTelSdkError::Timeout(timeout))?;
606        if let Some(handle) = self.handle.lock().unwrap().take() {
607            if let Err(err) = handle.join() {
608                return Err(OTelSdkError::InternalFailure(format!(
609                    "Background thread failed to join during shutdown. This may indicate a panic or unexpected termination: {:?}",
610                    err
611                )));
612            }
613        }
614        result
615    }
616
617    /// Set the resource for the processor.
618    fn set_resource(&mut self, resource: &Resource) {
619        let resource = Arc::new(resource.clone());
620        let _ = self
621            .message_sender
622            .try_send(BatchMessage::SetResource(resource));
623    }
624}
625
626/// Builder for `BatchSpanProcessorDedicatedThread`.
627#[derive(Debug, Default)]
628pub struct BatchSpanProcessorBuilder<E>
629where
630    E: SpanExporter + Send + 'static,
631{
632    exporter: E,
633    config: BatchConfig,
634}
635
636impl<E> BatchSpanProcessorBuilder<E>
637where
638    E: SpanExporter + Send + 'static,
639{
640    /// Set the BatchConfig for [BatchSpanProcessorBuilder]
641    pub fn with_batch_config(self, config: BatchConfig) -> Self {
642        BatchSpanProcessorBuilder { config, ..self }
643    }
644
645    /// Build a new instance of `BatchSpanProcessor`.
646    pub fn build(self) -> BatchSpanProcessor {
647        BatchSpanProcessor::new(self.exporter, self.config)
648    }
649}
650
651/// Batch span processor configuration.
652/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
653#[derive(Debug)]
654pub struct BatchConfig {
655    /// The maximum queue size to buffer spans for delayed processing. If the
656    /// queue gets full it drops the spans. The default value of is 2048.
657    pub(crate) max_queue_size: usize,
658
659    /// The delay interval in milliseconds between two consecutive processing
660    /// of batches. The default value is 5 seconds.
661    pub(crate) scheduled_delay: Duration,
662
663    #[allow(dead_code)]
664    /// The maximum number of spans to process in a single batch. If there are
665    /// more than one batch worth of spans then it processes multiple batches
666    /// of spans one batch after the other without any delay. The default value
667    /// is 512.
668    pub(crate) max_export_batch_size: usize,
669
670    #[allow(dead_code)]
671    /// The maximum duration to export a batch of data.
672    pub(crate) max_export_timeout: Duration,
673
674    #[allow(dead_code)]
675    /// Maximum number of concurrent exports
676    ///
677    /// Limits the number of spawned tasks for exports and thus memory consumed
678    /// by an exporter. A value of 1 will cause exports to be performed
679    /// synchronously on the BatchSpanProcessor task.
680    pub(crate) max_concurrent_exports: usize,
681}
682
683impl Default for BatchConfig {
684    fn default() -> Self {
685        BatchConfigBuilder::default().build()
686    }
687}
688
689/// A builder for creating [`BatchConfig`] instances.
690#[derive(Debug)]
691pub struct BatchConfigBuilder {
692    max_queue_size: usize,
693    scheduled_delay: Duration,
694    max_export_batch_size: usize,
695    max_export_timeout: Duration,
696    max_concurrent_exports: usize,
697}
698
699impl Default for BatchConfigBuilder {
700    /// Create a new [`BatchConfigBuilder`] initialized with default batch config values as per the specs.
701    /// The values are overriden by environment variables if set.
702    /// The supported environment variables are:
703    /// * `OTEL_BSP_MAX_QUEUE_SIZE`
704    /// * `OTEL_BSP_SCHEDULE_DELAY`
705    /// * `OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
706    /// * `OTEL_BSP_EXPORT_TIMEOUT`
707    /// * `OTEL_BSP_MAX_CONCURRENT_EXPORTS`
708    ///
709    /// Note: Programmatic configuration overrides any value set via the environment variable.
710    fn default() -> Self {
711        BatchConfigBuilder {
712            max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
713            scheduled_delay: OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
714            max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
715            max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT,
716            max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
717        }
718        .init_from_env_vars()
719    }
720}
721
722impl BatchConfigBuilder {
723    /// Set max_queue_size for [`BatchConfigBuilder`].
724    /// It's the maximum queue size to buffer spans for delayed processing.
725    /// If the queue gets full it will drops the spans.
726    /// The default value is 2048.
727    ///
728    /// Corresponding environment variable: `OTEL_BSP_MAX_QUEUE_SIZE`.
729    ///
730    /// Note: Programmatically setting this will override any value set via the environment variable.
731    pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
732        self.max_queue_size = max_queue_size;
733        self
734    }
735
736    /// Set max_export_batch_size for [`BatchConfigBuilder`].
737    /// It's the maximum number of spans to process in a single batch. If there are
738    /// more than one batch worth of spans then it processes multiple batches
739    /// of spans one batch after the other without any delay. The default value
740    /// is 512.
741    ///
742    /// Corresponding environment variable: `OTEL_BSP_MAX_EXPORT_BATCH_SIZE`.
743    ///
744    /// Note: Programmatically setting this will override any value set via the environment variable.
745    pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
746        self.max_export_batch_size = max_export_batch_size;
747        self
748    }
749
750    #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
751    /// Set max_concurrent_exports for [`BatchConfigBuilder`].
752    /// It's the maximum number of concurrent exports.
753    /// Limits the number of spawned tasks for exports and thus memory consumed by an exporter.
754    /// The default value is 1.
755    /// If the max_concurrent_exports value is default value, it will cause exports to be performed
756    /// synchronously on the BatchSpanProcessor task.
757    /// The default value is 1.
758    ///
759    /// Corresponding environment variable: `OTEL_BSP_MAX_CONCURRENT_EXPORTS`.
760    ///
761    /// Note: Programmatically setting this will override any value set via the environment variable.
762    pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
763        self.max_concurrent_exports = max_concurrent_exports;
764        self
765    }
766
767    /// Set scheduled_delay_duration for [`BatchConfigBuilder`].
768    /// It's the delay interval in milliseconds between two consecutive processing of batches.
769    /// The default value is 5000 milliseconds.
770    ///
771    /// Corresponding environment variable: `OTEL_BSP_SCHEDULE_DELAY`.
772    ///
773    /// Note: Programmatically setting this will override any value set via the environment variable.
774    pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
775        self.scheduled_delay = scheduled_delay;
776        self
777    }
778
779    /// Set max_export_timeout for [`BatchConfigBuilder`].
780    /// It's the maximum duration to export a batch of data.
781    /// The The default value is 30000 milliseconds.
782    ///
783    /// Corresponding environment variable: `OTEL_BSP_EXPORT_TIMEOUT`.
784    ///
785    /// Note: Programmatically setting this will override any value set via the environment variable.
786    #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
787    pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
788        self.max_export_timeout = max_export_timeout;
789        self
790    }
791
792    /// Builds a `BatchConfig` enforcing the following invariants:
793    /// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
794    pub fn build(self) -> BatchConfig {
795        // max export batch size must be less or equal to max queue size.
796        // we set max export batch size to max queue size if it's larger than max queue size.
797        let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
798
799        BatchConfig {
800            max_queue_size: self.max_queue_size,
801            scheduled_delay: self.scheduled_delay,
802            max_export_timeout: self.max_export_timeout,
803            max_concurrent_exports: self.max_concurrent_exports,
804            max_export_batch_size,
805        }
806    }
807
808    fn init_from_env_vars(mut self) -> Self {
809        if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS)
810            .ok()
811            .and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok())
812        {
813            self.max_concurrent_exports = max_concurrent_exports;
814        }
815
816        if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
817            .ok()
818            .and_then(|queue_size| usize::from_str(&queue_size).ok())
819        {
820            self.max_queue_size = max_queue_size;
821        }
822
823        if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
824            .ok()
825            .and_then(|delay| u64::from_str(&delay).ok())
826        {
827            self.scheduled_delay = Duration::from_millis(scheduled_delay);
828        }
829
830        if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
831            .ok()
832            .and_then(|batch_size| usize::from_str(&batch_size).ok())
833        {
834            self.max_export_batch_size = max_export_batch_size;
835        }
836
837        // max export batch size must be less or equal to max queue size.
838        // we set max export batch size to max queue size if it's larger than max queue size.
839        if self.max_export_batch_size > self.max_queue_size {
840            self.max_export_batch_size = self.max_queue_size;
841        }
842
843        if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
844            .ok()
845            .and_then(|timeout| u64::from_str(&timeout).ok())
846        {
847            self.max_export_timeout = Duration::from_millis(max_export_timeout);
848        }
849
850        self
851    }
852}
853
854#[cfg(all(test, feature = "testing", feature = "trace"))]
855mod tests {
856    // cargo test trace::span_processor::tests:: --features=testing
857    use super::{
858        BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
859        OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
860        OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
861    };
862    use crate::error::OTelSdkResult;
863    use crate::testing::trace::new_test_export_span_data;
864    use crate::trace::span_processor::{
865        OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,
866        OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
867    };
868    use crate::trace::InMemorySpanExporterBuilder;
869    use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks};
870    use crate::trace::{SpanData, SpanExporter};
871    use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
872    use std::fmt::Debug;
873    use std::time::Duration;
874
875    #[test]
876    fn simple_span_processor_on_end_calls_export() {
877        let exporter = InMemorySpanExporterBuilder::new().build();
878        let processor = SimpleSpanProcessor::new(exporter.clone());
879        let span_data = new_test_export_span_data();
880        processor.on_end(span_data.clone());
881        assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
882        let _result = processor.shutdown();
883    }
884
885    #[test]
886    fn simple_span_processor_on_end_skips_export_if_not_sampled() {
887        let exporter = InMemorySpanExporterBuilder::new().build();
888        let processor = SimpleSpanProcessor::new(exporter.clone());
889        let unsampled = SpanData {
890            span_context: SpanContext::empty_context(),
891            parent_span_id: SpanId::INVALID,
892            span_kind: SpanKind::Internal,
893            name: "opentelemetry".into(),
894            start_time: opentelemetry::time::now(),
895            end_time: opentelemetry::time::now(),
896            attributes: Vec::new(),
897            dropped_attributes_count: 0,
898            events: SpanEvents::default(),
899            links: SpanLinks::default(),
900            status: Status::Unset,
901            instrumentation_scope: Default::default(),
902        };
903        processor.on_end(unsampled);
904        assert!(exporter.get_finished_spans().unwrap().is_empty());
905    }
906
907    #[test]
908    fn simple_span_processor_shutdown_calls_shutdown() {
909        let exporter = InMemorySpanExporterBuilder::new().build();
910        let processor = SimpleSpanProcessor::new(exporter.clone());
911        let span_data = new_test_export_span_data();
912        processor.on_end(span_data.clone());
913        assert!(!exporter.get_finished_spans().unwrap().is_empty());
914        let _result = processor.shutdown();
915        // Assume shutdown is called by ensuring spans are empty in the exporter
916        assert!(exporter.get_finished_spans().unwrap().is_empty());
917    }
918
919    #[test]
920    fn test_default_const_values() {
921        assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE, "OTEL_BSP_MAX_QUEUE_SIZE");
922        assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, 2048);
923        assert_eq!(OTEL_BSP_SCHEDULE_DELAY, "OTEL_BSP_SCHEDULE_DELAY");
924        assert_eq!(OTEL_BSP_SCHEDULE_DELAY_DEFAULT.as_millis(), 5000);
925        assert_eq!(
926            OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
927            "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
928        );
929        assert_eq!(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
930        assert_eq!(OTEL_BSP_EXPORT_TIMEOUT, "OTEL_BSP_EXPORT_TIMEOUT");
931        assert_eq!(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30000);
932    }
933
934    #[test]
935    fn test_default_batch_config_adheres_to_specification() {
936        let env_vars = vec![
937            OTEL_BSP_SCHEDULE_DELAY,
938            OTEL_BSP_EXPORT_TIMEOUT,
939            OTEL_BSP_MAX_QUEUE_SIZE,
940            OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
941            OTEL_BSP_MAX_CONCURRENT_EXPORTS,
942        ];
943
944        let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
945
946        assert_eq!(
947            config.max_concurrent_exports,
948            OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT
949        );
950        assert_eq!(config.scheduled_delay, OTEL_BSP_SCHEDULE_DELAY_DEFAULT);
951        assert_eq!(config.max_export_timeout, OTEL_BSP_EXPORT_TIMEOUT_DEFAULT);
952        assert_eq!(config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
953        assert_eq!(
954            config.max_export_batch_size,
955            OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT
956        );
957    }
958
959    #[test]
960    fn test_code_based_config_overrides_env_vars() {
961        let env_vars = vec![
962            (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
963            (OTEL_BSP_MAX_CONCURRENT_EXPORTS, Some("5")),
964            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
965            (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
966            (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
967        ];
968
969        temp_env::with_vars(env_vars, || {
970            let config = BatchConfigBuilder::default()
971                .with_max_export_batch_size(512)
972                .with_max_queue_size(2048)
973                .with_scheduled_delay(Duration::from_millis(1000));
974            #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
975            let config = {
976                config
977                    .with_max_concurrent_exports(10)
978                    .with_max_export_timeout(Duration::from_millis(2000))
979            };
980            let config = config.build();
981
982            assert_eq!(config.max_export_batch_size, 512);
983            assert_eq!(config.max_queue_size, 2048);
984            assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
985            #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
986            {
987                assert_eq!(config.max_concurrent_exports, 10);
988                assert_eq!(config.max_export_timeout, Duration::from_millis(2000));
989            }
990        });
991    }
992
993    #[test]
994    fn test_batch_config_configurable_by_env_vars() {
995        let env_vars = vec![
996            (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
997            (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
998            (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
999            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1000        ];
1001
1002        let config = temp_env::with_vars(env_vars, BatchConfig::default);
1003
1004        assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
1005        assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
1006        assert_eq!(config.max_queue_size, 4096);
1007        assert_eq!(config.max_export_batch_size, 1024);
1008    }
1009
1010    #[test]
1011    fn test_batch_config_max_export_batch_size_validation() {
1012        let env_vars = vec![
1013            (OTEL_BSP_MAX_QUEUE_SIZE, Some("256")),
1014            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1015        ];
1016
1017        let config = temp_env::with_vars(env_vars, BatchConfig::default);
1018
1019        assert_eq!(config.max_queue_size, 256);
1020        assert_eq!(config.max_export_batch_size, 256);
1021        assert_eq!(config.scheduled_delay, OTEL_BSP_SCHEDULE_DELAY_DEFAULT);
1022        assert_eq!(config.max_export_timeout, OTEL_BSP_EXPORT_TIMEOUT_DEFAULT);
1023    }
1024
1025    #[test]
1026    fn test_batch_config_with_fields() {
1027        let batch = BatchConfigBuilder::default()
1028            .with_max_export_batch_size(10)
1029            .with_scheduled_delay(Duration::from_millis(10))
1030            .with_max_queue_size(10);
1031        #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1032        let batch = {
1033            batch
1034                .with_max_concurrent_exports(10)
1035                .with_max_export_timeout(Duration::from_millis(10))
1036        };
1037        let batch = batch.build();
1038        assert_eq!(batch.max_export_batch_size, 10);
1039        assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
1040        assert_eq!(batch.max_queue_size, 10);
1041        #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1042        {
1043            assert_eq!(batch.max_concurrent_exports, 10);
1044            assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
1045        }
1046    }
1047
1048    // Helper function to create a default test span
1049    fn create_test_span(name: &str) -> SpanData {
1050        SpanData {
1051            span_context: SpanContext::empty_context(),
1052            parent_span_id: SpanId::INVALID,
1053            span_kind: SpanKind::Internal,
1054            name: name.to_string().into(),
1055            start_time: opentelemetry::time::now(),
1056            end_time: opentelemetry::time::now(),
1057            attributes: Vec::new(),
1058            dropped_attributes_count: 0,
1059            events: SpanEvents::default(),
1060            links: SpanLinks::default(),
1061            status: Status::Unset,
1062            instrumentation_scope: Default::default(),
1063        }
1064    }
1065
1066    use crate::Resource;
1067    use opentelemetry::{Key, KeyValue, Value};
1068    use std::sync::{atomic::Ordering, Arc, Mutex};
1069
1070    // Mock exporter to test functionality
1071    #[derive(Debug)]
1072    struct MockSpanExporter {
1073        exported_spans: Arc<Mutex<Vec<SpanData>>>,
1074        exported_resource: Arc<Mutex<Option<Resource>>>,
1075    }
1076
1077    impl MockSpanExporter {
1078        fn new() -> Self {
1079            Self {
1080                exported_spans: Arc::new(Mutex::new(Vec::new())),
1081                exported_resource: Arc::new(Mutex::new(None)),
1082            }
1083        }
1084    }
1085
1086    impl SpanExporter for MockSpanExporter {
1087        async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
1088            let exported_spans = self.exported_spans.clone();
1089            exported_spans.lock().unwrap().extend(batch);
1090            Ok(())
1091        }
1092
1093        fn shutdown(&mut self) -> OTelSdkResult {
1094            Ok(())
1095        }
1096        fn set_resource(&mut self, resource: &Resource) {
1097            let mut exported_resource = self.exported_resource.lock().unwrap();
1098            *exported_resource = Some(resource.clone());
1099        }
1100    }
1101
1102    #[test]
1103    fn batchspanprocessor_handles_on_end() {
1104        let exporter = MockSpanExporter::new();
1105        let exporter_shared = exporter.exported_spans.clone();
1106        let config = BatchConfigBuilder::default()
1107            .with_max_queue_size(10)
1108            .with_max_export_batch_size(10)
1109            .with_scheduled_delay(Duration::from_secs(5))
1110            .build();
1111        let processor = BatchSpanProcessor::new(exporter, config);
1112
1113        let test_span = create_test_span("test_span");
1114        processor.on_end(test_span.clone());
1115
1116        // Wait for flush interval to ensure the span is processed
1117        std::thread::sleep(Duration::from_secs(6));
1118
1119        let exported_spans = exporter_shared.lock().unwrap();
1120        assert_eq!(exported_spans.len(), 1);
1121        assert_eq!(exported_spans[0].name, "test_span");
1122    }
1123
1124    #[test]
1125    fn batchspanprocessor_force_flush() {
1126        let exporter = MockSpanExporter::new();
1127        let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
1128        let config = BatchConfigBuilder::default()
1129            .with_max_queue_size(10)
1130            .with_max_export_batch_size(10)
1131            .with_scheduled_delay(Duration::from_secs(5))
1132            .build();
1133        let processor = BatchSpanProcessor::new(exporter, config);
1134
1135        // Create a test span and send it to the processor
1136        let test_span = create_test_span("force_flush_span");
1137        processor.on_end(test_span.clone());
1138
1139        // Call force_flush to immediately export the spans
1140        let flush_result = processor.force_flush();
1141        assert!(flush_result.is_ok(), "Force flush failed unexpectedly");
1142
1143        // Verify the exported spans in the mock exporter
1144        let exported_spans = exporter_shared.lock().unwrap();
1145        assert_eq!(
1146            exported_spans.len(),
1147            1,
1148            "Unexpected number of exported spans"
1149        );
1150        assert_eq!(exported_spans[0].name, "force_flush_span");
1151    }
1152
1153    #[test]
1154    fn batchspanprocessor_shutdown() {
1155        let exporter = MockSpanExporter::new();
1156        let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
1157        let config = BatchConfigBuilder::default()
1158            .with_max_queue_size(10)
1159            .with_max_export_batch_size(10)
1160            .with_scheduled_delay(Duration::from_secs(5))
1161            .build();
1162        let processor = BatchSpanProcessor::new(exporter, config);
1163
1164        // Create a test span and send it to the processor
1165        let test_span = create_test_span("shutdown_span");
1166        processor.on_end(test_span.clone());
1167
1168        // Call shutdown to flush and export all pending spans
1169        let shutdown_result = processor.shutdown();
1170        assert!(shutdown_result.is_ok(), "Shutdown failed unexpectedly");
1171
1172        // Verify the exported spans in the mock exporter
1173        let exported_spans = exporter_shared.lock().unwrap();
1174        assert_eq!(
1175            exported_spans.len(),
1176            1,
1177            "Unexpected number of exported spans"
1178        );
1179        assert_eq!(exported_spans[0].name, "shutdown_span");
1180
1181        // Ensure further calls to shutdown are idempotent
1182        let second_shutdown_result = processor.shutdown();
1183        assert!(
1184            second_shutdown_result.is_err(),
1185            "Shutdown should fail when called a second time"
1186        );
1187    }
1188
1189    #[test]
1190    fn batchspanprocessor_handles_dropped_spans() {
1191        let exporter = MockSpanExporter::new();
1192        let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
1193        let config = BatchConfigBuilder::default()
1194            .with_max_queue_size(2) // Small queue size to test span dropping
1195            .with_scheduled_delay(Duration::from_secs(5))
1196            .build();
1197        let processor = BatchSpanProcessor::new(exporter, config);
1198
1199        // Create test spans and send them to the processor
1200        let span1 = create_test_span("span1");
1201        let span2 = create_test_span("span2");
1202        let span3 = create_test_span("span3"); // This span should be dropped
1203
1204        processor.on_end(span1.clone());
1205        processor.on_end(span2.clone());
1206        processor.on_end(span3.clone()); // This span exceeds the queue size
1207
1208        // Wait for the scheduled delay to expire
1209        std::thread::sleep(Duration::from_secs(3));
1210
1211        let exported_spans = exporter_shared.lock().unwrap();
1212
1213        // Verify that only the first two spans are exported
1214        assert_eq!(
1215            exported_spans.len(),
1216            2,
1217            "Unexpected number of exported spans"
1218        );
1219        assert!(exported_spans.iter().any(|s| s.name == "span1"));
1220        assert!(exported_spans.iter().any(|s| s.name == "span2"));
1221
1222        // Ensure the third span is dropped
1223        assert!(
1224            !exported_spans.iter().any(|s| s.name == "span3"),
1225            "Span3 should have been dropped"
1226        );
1227
1228        // Verify dropped spans count (if accessible in your implementation)
1229        let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed);
1230        assert_eq!(dropped_count, 1, "Unexpected number of dropped spans");
1231    }
1232
1233    #[test]
1234    fn validate_span_attributes_exported_correctly() {
1235        let exporter = MockSpanExporter::new();
1236        let exporter_shared = exporter.exported_spans.clone();
1237        let config = BatchConfigBuilder::default().build();
1238        let processor = BatchSpanProcessor::new(exporter, config);
1239
1240        // Create a span with attributes
1241        let mut span_data = create_test_span("attribute_validation");
1242        span_data.attributes = vec![
1243            KeyValue::new("key1", "value1"),
1244            KeyValue::new("key2", "value2"),
1245        ];
1246        processor.on_end(span_data.clone());
1247
1248        // Force flush to export the span
1249        let _ = processor.force_flush();
1250
1251        // Validate the exported attributes
1252        let exported_spans = exporter_shared.lock().unwrap();
1253        assert_eq!(exported_spans.len(), 1);
1254        let exported_span = &exported_spans[0];
1255        assert!(exported_span
1256            .attributes
1257            .contains(&KeyValue::new("key1", "value1")));
1258        assert!(exported_span
1259            .attributes
1260            .contains(&KeyValue::new("key2", "value2")));
1261    }
1262
1263    #[test]
1264    fn batchspanprocessor_sets_and_exports_with_resource() {
1265        let exporter = MockSpanExporter::new();
1266        let exporter_shared = exporter.exported_spans.clone();
1267        let resource_shared = exporter.exported_resource.clone();
1268        let config = BatchConfigBuilder::default().build();
1269        let mut processor = BatchSpanProcessor::new(exporter, config);
1270
1271        // Set a resource for the processor
1272        let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]);
1273        processor.set_resource(&resource);
1274
1275        // Create a span and send it to the processor
1276        let test_span = create_test_span("resource_test");
1277        processor.on_end(test_span.clone());
1278
1279        // Force flush to ensure the span is exported
1280        let _ = processor.force_flush();
1281
1282        // Validate spans are exported
1283        let exported_spans = exporter_shared.lock().unwrap();
1284        assert_eq!(exported_spans.len(), 1);
1285
1286        // Validate the resource is correctly set in the exporter
1287        let exported_resource = resource_shared.lock().unwrap();
1288        assert!(exported_resource.is_some());
1289        assert_eq!(
1290            exported_resource
1291                .as_ref()
1292                .unwrap()
1293                .get(&Key::new("service.name")),
1294            Some(Value::from("test_service"))
1295        );
1296    }
1297
1298    #[tokio::test(flavor = "current_thread")]
1299    async fn test_batch_processor_current_thread_runtime() {
1300        let exporter = MockSpanExporter::new();
1301        let exporter_shared = exporter.exported_spans.clone();
1302
1303        let config = BatchConfigBuilder::default()
1304            .with_max_queue_size(5)
1305            .with_max_export_batch_size(3)
1306            .build();
1307
1308        let processor = BatchSpanProcessor::new(exporter, config);
1309
1310        for _ in 0..4 {
1311            let span = new_test_export_span_data();
1312            processor.on_end(span);
1313        }
1314
1315        processor.force_flush().unwrap();
1316
1317        let exported_spans = exporter_shared.lock().unwrap();
1318        assert_eq!(exported_spans.len(), 4);
1319    }
1320
1321    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1322    async fn test_batch_processor_multi_thread_count_1_runtime() {
1323        let exporter = MockSpanExporter::new();
1324        let exporter_shared = exporter.exported_spans.clone();
1325
1326        let config = BatchConfigBuilder::default()
1327            .with_max_queue_size(5)
1328            .with_max_export_batch_size(3)
1329            .build();
1330
1331        let processor = BatchSpanProcessor::new(exporter, config);
1332
1333        for _ in 0..4 {
1334            let span = new_test_export_span_data();
1335            processor.on_end(span);
1336        }
1337
1338        processor.force_flush().unwrap();
1339
1340        let exported_spans = exporter_shared.lock().unwrap();
1341        assert_eq!(exported_spans.len(), 4);
1342    }
1343
1344    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1345    async fn test_batch_processor_multi_thread() {
1346        let exporter = MockSpanExporter::new();
1347        let exporter_shared = exporter.exported_spans.clone();
1348
1349        let config = BatchConfigBuilder::default()
1350            .with_max_queue_size(20)
1351            .with_max_export_batch_size(5)
1352            .build();
1353
1354        // Create the processor with the thread-safe exporter
1355        let processor = Arc::new(BatchSpanProcessor::new(exporter, config));
1356
1357        let mut handles = vec![];
1358        for _ in 0..10 {
1359            let processor_clone = Arc::clone(&processor);
1360            let handle = tokio::spawn(async move {
1361                let span = new_test_export_span_data();
1362                processor_clone.on_end(span);
1363            });
1364            handles.push(handle);
1365        }
1366
1367        for handle in handles {
1368            handle.await.unwrap();
1369        }
1370
1371        processor.force_flush().unwrap();
1372
1373        // Verify exported spans
1374        let exported_spans = exporter_shared.lock().unwrap();
1375        assert_eq!(exported_spans.len(), 10);
1376    }
1377}