1use crate::error::{OTelSdkError, OTelSdkResult};
38use crate::resource::Resource;
39use crate::trace::Span;
40use crate::trace::{SpanData, SpanExporter};
41use opentelemetry::Context;
42use opentelemetry::{otel_debug, otel_error, otel_warn};
43use std::cmp::min;
44use std::sync::atomic::{AtomicUsize, Ordering};
45use std::sync::{Arc, Mutex};
46use std::{env, str::FromStr, time::Duration};
47
48use std::sync::atomic::AtomicBool;
49use std::thread;
50use std::time::Instant;
51
52pub(crate) const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
54pub(crate) const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: Duration = Duration::from_millis(5_000);
56pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
58pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
60pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
62pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
64pub(crate) const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
66pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_millis(30_000);
68pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
71pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
73
74pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
78 fn on_start(&self, span: &mut Span, cx: &Context);
82 fn on_end(&self, span: SpanData);
87 fn force_flush(&self) -> OTelSdkResult;
89 fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;
94 fn shutdown(&self) -> OTelSdkResult {
96 self.shutdown_with_timeout(Duration::from_secs(5))
97 }
98 fn set_resource(&mut self, _resource: &Resource) {}
100}
101
102#[derive(Debug)]
118pub struct SimpleSpanProcessor<T: SpanExporter> {
119 exporter: Mutex<T>,
120}
121
122impl<T: SpanExporter> SimpleSpanProcessor<T> {
123 pub fn new(exporter: T) -> Self {
125 Self {
126 exporter: Mutex::new(exporter),
127 }
128 }
129}
130
131impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
132 fn on_start(&self, _span: &mut Span, _cx: &Context) {
133 }
135
136 fn on_end(&self, span: SpanData) {
137 if !span.span_context.is_sampled() {
138 return;
139 }
140
141 let result = self
142 .exporter
143 .lock()
144 .map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into()))
145 .and_then(|exporter| futures_executor::block_on(exporter.export(vec![span])));
146
147 if let Err(err) = result {
148 otel_debug!(
150 name: "SimpleProcessor.OnEnd.Error",
151 reason = format!("{:?}", err)
152 );
153 }
154 }
155
156 fn force_flush(&self) -> OTelSdkResult {
157 Ok(())
159 }
160
161 fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
162 if let Ok(mut exporter) = self.exporter.lock() {
163 exporter.shutdown_with_timeout(timeout)
164 } else {
165 Err(OTelSdkError::InternalFailure(
166 "SimpleSpanProcessor mutex poison at shutdown".into(),
167 ))
168 }
169 }
170
171 fn set_resource(&mut self, resource: &Resource) {
172 if let Ok(mut exporter) = self.exporter.lock() {
173 exporter.set_resource(resource);
174 }
175 }
176}
177
178use std::sync::mpsc::sync_channel;
239use std::sync::mpsc::Receiver;
240use std::sync::mpsc::RecvTimeoutError;
241use std::sync::mpsc::SyncSender;
242
243#[allow(clippy::large_enum_variant)]
245#[derive(Debug)]
246enum BatchMessage {
247 ExportSpan(Arc<AtomicBool>),
249 ForceFlush(SyncSender<OTelSdkResult>),
250 Shutdown(SyncSender<OTelSdkResult>),
251 SetResource(Arc<Resource>),
252}
253
254#[derive(Debug)]
285pub struct BatchSpanProcessor {
286 span_sender: SyncSender<SpanData>, message_sender: SyncSender<BatchMessage>, handle: Mutex<Option<thread::JoinHandle<()>>>,
289 forceflush_timeout: Duration,
290 is_shutdown: AtomicBool,
291 dropped_span_count: Arc<AtomicUsize>,
292 export_span_message_sent: Arc<AtomicBool>,
293 current_batch_size: Arc<AtomicUsize>,
294 max_export_batch_size: usize,
295 max_queue_size: usize,
296}
297
298impl BatchSpanProcessor {
299 pub fn new<E>(
301 mut exporter: E,
302 config: BatchConfig,
303 ) -> Self
307 where
308 E: SpanExporter + Send + 'static,
309 {
310 let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
311 let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); let max_queue_size = config.max_queue_size;
313 let max_export_batch_size = config.max_export_batch_size;
314 let current_batch_size = Arc::new(AtomicUsize::new(0));
315 let current_batch_size_for_thread = current_batch_size.clone();
316
317 let handle = thread::Builder::new()
318 .name("OpenTelemetry.Traces.BatchProcessor".to_string())
319 .spawn(move || {
320 let _suppress_guard = Context::enter_telemetry_suppressed_scope();
321 otel_debug!(
322 name: "BatchSpanProcessor.ThreadStarted",
323 interval_in_millisecs = config.scheduled_delay.as_millis(),
324 max_export_batch_size = config.max_export_batch_size,
325 max_queue_size = config.max_queue_size,
326 );
327 let mut spans = Vec::with_capacity(config.max_export_batch_size);
328 let mut last_export_time = Instant::now();
329 let current_batch_size = current_batch_size_for_thread;
330 loop {
331 let remaining_time_option = config
332 .scheduled_delay
333 .checked_sub(last_export_time.elapsed());
334 let remaining_time = match remaining_time_option {
335 Some(remaining_time) => remaining_time,
336 None => config.scheduled_delay,
337 };
338 match message_receiver.recv_timeout(remaining_time) {
339 Ok(message) => match message {
340 BatchMessage::ExportSpan(export_span_message_sent) => {
341 export_span_message_sent.store(false, Ordering::Relaxed);
343 otel_debug!(
344 name: "BatchSpanProcessor.ExportingDueToBatchSize",
345 );
346 let _ = Self::get_spans_and_export(
347 &span_receiver,
348 &mut exporter,
349 &mut spans,
350 &mut last_export_time,
351 ¤t_batch_size,
352 &config,
353 );
354 }
355 BatchMessage::ForceFlush(sender) => {
356 otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
357 let result = Self::get_spans_and_export(
358 &span_receiver,
359 &mut exporter,
360 &mut spans,
361 &mut last_export_time,
362 ¤t_batch_size,
363 &config,
364 );
365 let _ = sender.send(result);
366 }
367 BatchMessage::Shutdown(sender) => {
368 otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
369 let result = Self::get_spans_and_export(
370 &span_receiver,
371 &mut exporter,
372 &mut spans,
373 &mut last_export_time,
374 ¤t_batch_size,
375 &config,
376 );
377 let _ = sender.send(result);
378
379 otel_debug!(
380 name: "BatchSpanProcessor.ThreadExiting",
381 reason = "ShutdownRequested"
382 );
383 break;
387 }
388 BatchMessage::SetResource(resource) => {
389 exporter.set_resource(&resource);
390 }
391 },
392 Err(RecvTimeoutError::Timeout) => {
393 otel_debug!(
394 name: "BatchSpanProcessor.ExportingDueToTimer",
395 );
396
397 let _ = Self::get_spans_and_export(
398 &span_receiver,
399 &mut exporter,
400 &mut spans,
401 &mut last_export_time,
402 ¤t_batch_size,
403 &config,
404 );
405 }
406 Err(RecvTimeoutError::Disconnected) => {
407 otel_debug!(
410 name: "BatchSpanProcessor.ThreadExiting",
411 reason = "MessageSenderDisconnected"
412 );
413 break;
414 }
415 }
416 }
417 otel_debug!(
418 name: "BatchSpanProcessor.ThreadStopped"
419 );
420 })
421 .expect("Failed to spawn thread"); Self {
424 span_sender,
425 message_sender,
426 handle: Mutex::new(Some(handle)),
427 forceflush_timeout: Duration::from_secs(5), is_shutdown: AtomicBool::new(false),
429 dropped_span_count: Arc::new(AtomicUsize::new(0)),
430 max_queue_size,
431 export_span_message_sent: Arc::new(AtomicBool::new(false)),
432 current_batch_size,
433 max_export_batch_size,
434 }
435 }
436
437 pub fn builder<E>(exporter: E) -> BatchSpanProcessorBuilder<E>
439 where
440 E: SpanExporter + Send + 'static,
441 {
442 BatchSpanProcessorBuilder {
443 exporter,
444 config: BatchConfig::default(),
445 }
446 }
447
448 #[inline]
452 fn get_spans_and_export<E>(
453 spans_receiver: &Receiver<SpanData>,
454 exporter: &mut E,
455 spans: &mut Vec<SpanData>,
456 last_export_time: &mut Instant,
457 current_batch_size: &AtomicUsize,
458 config: &BatchConfig,
459 ) -> OTelSdkResult
460 where
461 E: SpanExporter + Send + Sync + 'static,
462 {
463 while let Ok(span) = spans_receiver.try_recv() {
465 spans.push(span);
466 if spans.len() == config.max_export_batch_size {
467 break;
468 }
469 }
470
471 let count_of_spans = spans.len(); let result = Self::export_batch_sync(exporter, spans, last_export_time); current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
475 result
476 }
477
478 #[allow(clippy::vec_box)]
479 fn export_batch_sync<E>(
480 exporter: &mut E,
481 batch: &mut Vec<SpanData>,
482 last_export_time: &mut Instant,
483 ) -> OTelSdkResult
484 where
485 E: SpanExporter + Send + Sync + 'static,
486 {
487 *last_export_time = Instant::now();
488
489 if batch.is_empty() {
490 return OTelSdkResult::Ok(());
491 }
492
493 let export = exporter.export(batch.split_off(0));
494 let export_result = futures_executor::block_on(export);
495
496 match export_result {
497 Ok(_) => OTelSdkResult::Ok(()),
498 Err(err) => {
499 otel_error!(
500 name: "BatchSpanProcessor.ExportError",
501 error = format!("{}", err)
502 );
503 Err(OTelSdkError::InternalFailure(err.to_string()))
504 }
505 }
506 }
507}
508
509impl SpanProcessor for BatchSpanProcessor {
510 fn on_start(&self, _span: &mut Span, _cx: &Context) {
512 }
514
515 fn on_end(&self, span: SpanData) {
517 if self.is_shutdown.load(Ordering::Relaxed) {
518 otel_warn!(
520 name: "BatchSpanProcessor.Emit.ProcessorShutdown",
521 message = "BatchSpanProcessor has been shutdown. No further spans will be emitted."
522 );
523 return;
524 }
525 let result = self.span_sender.try_send(span);
526
527 if result.is_err() {
528 if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
531 otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
532 message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
533 }
534 }
535 if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
538 {
539 if !self.export_span_message_sent.load(Ordering::Relaxed) {
544 if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
549 match self.message_sender.try_send(BatchMessage::ExportSpan(
550 self.export_span_message_sent.clone(),
551 )) {
552 Ok(_) => {
553 }
555 Err(_err) => {
556 self.export_span_message_sent
559 .store(false, Ordering::Relaxed);
560 }
561 }
562 }
563 }
564 }
565 }
566
567 fn force_flush(&self) -> OTelSdkResult {
569 if self.is_shutdown.load(Ordering::Relaxed) {
570 return Err(OTelSdkError::AlreadyShutdown);
571 }
572 let (sender, receiver) = sync_channel(1);
573 self.message_sender
574 .try_send(BatchMessage::ForceFlush(sender))
575 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
576
577 receiver
578 .recv_timeout(self.forceflush_timeout)
579 .map_err(|_| OTelSdkError::Timeout(self.forceflush_timeout))?
580 }
581
582 fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
584 if self.is_shutdown.swap(true, Ordering::Relaxed) {
585 return Err(OTelSdkError::AlreadyShutdown);
586 }
587 let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed);
588 let max_queue_size = self.max_queue_size;
589 if dropped_spans > 0 {
590 otel_warn!(
591 name: "BatchSpanProcessor.SpansDropped",
592 dropped_span_count = dropped_spans,
593 max_queue_size = max_queue_size,
594 message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
595 );
596 }
597
598 let (sender, receiver) = sync_channel(1);
599 self.message_sender
600 .try_send(BatchMessage::Shutdown(sender))
601 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
602
603 let result = receiver
604 .recv_timeout(timeout)
605 .map_err(|_| OTelSdkError::Timeout(timeout))?;
606 if let Some(handle) = self.handle.lock().unwrap().take() {
607 if let Err(err) = handle.join() {
608 return Err(OTelSdkError::InternalFailure(format!(
609 "Background thread failed to join during shutdown. This may indicate a panic or unexpected termination: {:?}",
610 err
611 )));
612 }
613 }
614 result
615 }
616
617 fn set_resource(&mut self, resource: &Resource) {
619 let resource = Arc::new(resource.clone());
620 let _ = self
621 .message_sender
622 .try_send(BatchMessage::SetResource(resource));
623 }
624}
625
626#[derive(Debug, Default)]
628pub struct BatchSpanProcessorBuilder<E>
629where
630 E: SpanExporter + Send + 'static,
631{
632 exporter: E,
633 config: BatchConfig,
634}
635
636impl<E> BatchSpanProcessorBuilder<E>
637where
638 E: SpanExporter + Send + 'static,
639{
640 pub fn with_batch_config(self, config: BatchConfig) -> Self {
642 BatchSpanProcessorBuilder { config, ..self }
643 }
644
645 pub fn build(self) -> BatchSpanProcessor {
647 BatchSpanProcessor::new(self.exporter, self.config)
648 }
649}
650
651#[derive(Debug)]
654pub struct BatchConfig {
655 pub(crate) max_queue_size: usize,
658
659 pub(crate) scheduled_delay: Duration,
662
663 #[allow(dead_code)]
664 pub(crate) max_export_batch_size: usize,
669
670 #[allow(dead_code)]
671 pub(crate) max_export_timeout: Duration,
673
674 #[allow(dead_code)]
675 pub(crate) max_concurrent_exports: usize,
681}
682
683impl Default for BatchConfig {
684 fn default() -> Self {
685 BatchConfigBuilder::default().build()
686 }
687}
688
689#[derive(Debug)]
691pub struct BatchConfigBuilder {
692 max_queue_size: usize,
693 scheduled_delay: Duration,
694 max_export_batch_size: usize,
695 max_export_timeout: Duration,
696 max_concurrent_exports: usize,
697}
698
699impl Default for BatchConfigBuilder {
700 fn default() -> Self {
711 BatchConfigBuilder {
712 max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
713 scheduled_delay: OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
714 max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
715 max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT,
716 max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
717 }
718 .init_from_env_vars()
719 }
720}
721
722impl BatchConfigBuilder {
723 pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
732 self.max_queue_size = max_queue_size;
733 self
734 }
735
736 pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
746 self.max_export_batch_size = max_export_batch_size;
747 self
748 }
749
750 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
751 pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
763 self.max_concurrent_exports = max_concurrent_exports;
764 self
765 }
766
767 pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
775 self.scheduled_delay = scheduled_delay;
776 self
777 }
778
779 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
787 pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
788 self.max_export_timeout = max_export_timeout;
789 self
790 }
791
792 pub fn build(self) -> BatchConfig {
795 let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
798
799 BatchConfig {
800 max_queue_size: self.max_queue_size,
801 scheduled_delay: self.scheduled_delay,
802 max_export_timeout: self.max_export_timeout,
803 max_concurrent_exports: self.max_concurrent_exports,
804 max_export_batch_size,
805 }
806 }
807
808 fn init_from_env_vars(mut self) -> Self {
809 if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS)
810 .ok()
811 .and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok())
812 {
813 self.max_concurrent_exports = max_concurrent_exports;
814 }
815
816 if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
817 .ok()
818 .and_then(|queue_size| usize::from_str(&queue_size).ok())
819 {
820 self.max_queue_size = max_queue_size;
821 }
822
823 if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
824 .ok()
825 .and_then(|delay| u64::from_str(&delay).ok())
826 {
827 self.scheduled_delay = Duration::from_millis(scheduled_delay);
828 }
829
830 if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
831 .ok()
832 .and_then(|batch_size| usize::from_str(&batch_size).ok())
833 {
834 self.max_export_batch_size = max_export_batch_size;
835 }
836
837 if self.max_export_batch_size > self.max_queue_size {
840 self.max_export_batch_size = self.max_queue_size;
841 }
842
843 if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
844 .ok()
845 .and_then(|timeout| u64::from_str(&timeout).ok())
846 {
847 self.max_export_timeout = Duration::from_millis(max_export_timeout);
848 }
849
850 self
851 }
852}
853
854#[cfg(all(test, feature = "testing", feature = "trace"))]
855mod tests {
856 use super::{
858 BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
859 OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
860 OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
861 };
862 use crate::error::OTelSdkResult;
863 use crate::testing::trace::new_test_export_span_data;
864 use crate::trace::span_processor::{
865 OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,
866 OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
867 };
868 use crate::trace::InMemorySpanExporterBuilder;
869 use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks};
870 use crate::trace::{SpanData, SpanExporter};
871 use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
872 use std::fmt::Debug;
873 use std::time::Duration;
874
875 #[test]
876 fn simple_span_processor_on_end_calls_export() {
877 let exporter = InMemorySpanExporterBuilder::new().build();
878 let processor = SimpleSpanProcessor::new(exporter.clone());
879 let span_data = new_test_export_span_data();
880 processor.on_end(span_data.clone());
881 assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
882 let _result = processor.shutdown();
883 }
884
885 #[test]
886 fn simple_span_processor_on_end_skips_export_if_not_sampled() {
887 let exporter = InMemorySpanExporterBuilder::new().build();
888 let processor = SimpleSpanProcessor::new(exporter.clone());
889 let unsampled = SpanData {
890 span_context: SpanContext::empty_context(),
891 parent_span_id: SpanId::INVALID,
892 span_kind: SpanKind::Internal,
893 name: "opentelemetry".into(),
894 start_time: opentelemetry::time::now(),
895 end_time: opentelemetry::time::now(),
896 attributes: Vec::new(),
897 dropped_attributes_count: 0,
898 events: SpanEvents::default(),
899 links: SpanLinks::default(),
900 status: Status::Unset,
901 instrumentation_scope: Default::default(),
902 };
903 processor.on_end(unsampled);
904 assert!(exporter.get_finished_spans().unwrap().is_empty());
905 }
906
907 #[test]
908 fn simple_span_processor_shutdown_calls_shutdown() {
909 let exporter = InMemorySpanExporterBuilder::new().build();
910 let processor = SimpleSpanProcessor::new(exporter.clone());
911 let span_data = new_test_export_span_data();
912 processor.on_end(span_data.clone());
913 assert!(!exporter.get_finished_spans().unwrap().is_empty());
914 let _result = processor.shutdown();
915 assert!(exporter.get_finished_spans().unwrap().is_empty());
917 }
918
919 #[test]
920 fn test_default_const_values() {
921 assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE, "OTEL_BSP_MAX_QUEUE_SIZE");
922 assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, 2048);
923 assert_eq!(OTEL_BSP_SCHEDULE_DELAY, "OTEL_BSP_SCHEDULE_DELAY");
924 assert_eq!(OTEL_BSP_SCHEDULE_DELAY_DEFAULT.as_millis(), 5000);
925 assert_eq!(
926 OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
927 "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
928 );
929 assert_eq!(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
930 assert_eq!(OTEL_BSP_EXPORT_TIMEOUT, "OTEL_BSP_EXPORT_TIMEOUT");
931 assert_eq!(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT.as_millis(), 30000);
932 }
933
934 #[test]
935 fn test_default_batch_config_adheres_to_specification() {
936 let env_vars = vec![
937 OTEL_BSP_SCHEDULE_DELAY,
938 OTEL_BSP_EXPORT_TIMEOUT,
939 OTEL_BSP_MAX_QUEUE_SIZE,
940 OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
941 OTEL_BSP_MAX_CONCURRENT_EXPORTS,
942 ];
943
944 let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
945
946 assert_eq!(
947 config.max_concurrent_exports,
948 OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT
949 );
950 assert_eq!(config.scheduled_delay, OTEL_BSP_SCHEDULE_DELAY_DEFAULT);
951 assert_eq!(config.max_export_timeout, OTEL_BSP_EXPORT_TIMEOUT_DEFAULT);
952 assert_eq!(config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
953 assert_eq!(
954 config.max_export_batch_size,
955 OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT
956 );
957 }
958
959 #[test]
960 fn test_code_based_config_overrides_env_vars() {
961 let env_vars = vec![
962 (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
963 (OTEL_BSP_MAX_CONCURRENT_EXPORTS, Some("5")),
964 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
965 (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
966 (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
967 ];
968
969 temp_env::with_vars(env_vars, || {
970 let config = BatchConfigBuilder::default()
971 .with_max_export_batch_size(512)
972 .with_max_queue_size(2048)
973 .with_scheduled_delay(Duration::from_millis(1000));
974 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
975 let config = {
976 config
977 .with_max_concurrent_exports(10)
978 .with_max_export_timeout(Duration::from_millis(2000))
979 };
980 let config = config.build();
981
982 assert_eq!(config.max_export_batch_size, 512);
983 assert_eq!(config.max_queue_size, 2048);
984 assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
985 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
986 {
987 assert_eq!(config.max_concurrent_exports, 10);
988 assert_eq!(config.max_export_timeout, Duration::from_millis(2000));
989 }
990 });
991 }
992
993 #[test]
994 fn test_batch_config_configurable_by_env_vars() {
995 let env_vars = vec![
996 (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
997 (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
998 (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
999 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1000 ];
1001
1002 let config = temp_env::with_vars(env_vars, BatchConfig::default);
1003
1004 assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
1005 assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
1006 assert_eq!(config.max_queue_size, 4096);
1007 assert_eq!(config.max_export_batch_size, 1024);
1008 }
1009
1010 #[test]
1011 fn test_batch_config_max_export_batch_size_validation() {
1012 let env_vars = vec![
1013 (OTEL_BSP_MAX_QUEUE_SIZE, Some("256")),
1014 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
1015 ];
1016
1017 let config = temp_env::with_vars(env_vars, BatchConfig::default);
1018
1019 assert_eq!(config.max_queue_size, 256);
1020 assert_eq!(config.max_export_batch_size, 256);
1021 assert_eq!(config.scheduled_delay, OTEL_BSP_SCHEDULE_DELAY_DEFAULT);
1022 assert_eq!(config.max_export_timeout, OTEL_BSP_EXPORT_TIMEOUT_DEFAULT);
1023 }
1024
1025 #[test]
1026 fn test_batch_config_with_fields() {
1027 let batch = BatchConfigBuilder::default()
1028 .with_max_export_batch_size(10)
1029 .with_scheduled_delay(Duration::from_millis(10))
1030 .with_max_queue_size(10);
1031 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1032 let batch = {
1033 batch
1034 .with_max_concurrent_exports(10)
1035 .with_max_export_timeout(Duration::from_millis(10))
1036 };
1037 let batch = batch.build();
1038 assert_eq!(batch.max_export_batch_size, 10);
1039 assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
1040 assert_eq!(batch.max_queue_size, 10);
1041 #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
1042 {
1043 assert_eq!(batch.max_concurrent_exports, 10);
1044 assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
1045 }
1046 }
1047
1048 fn create_test_span(name: &str) -> SpanData {
1050 SpanData {
1051 span_context: SpanContext::empty_context(),
1052 parent_span_id: SpanId::INVALID,
1053 span_kind: SpanKind::Internal,
1054 name: name.to_string().into(),
1055 start_time: opentelemetry::time::now(),
1056 end_time: opentelemetry::time::now(),
1057 attributes: Vec::new(),
1058 dropped_attributes_count: 0,
1059 events: SpanEvents::default(),
1060 links: SpanLinks::default(),
1061 status: Status::Unset,
1062 instrumentation_scope: Default::default(),
1063 }
1064 }
1065
1066 use crate::Resource;
1067 use opentelemetry::{Key, KeyValue, Value};
1068 use std::sync::{atomic::Ordering, Arc, Mutex};
1069
1070 #[derive(Debug)]
1072 struct MockSpanExporter {
1073 exported_spans: Arc<Mutex<Vec<SpanData>>>,
1074 exported_resource: Arc<Mutex<Option<Resource>>>,
1075 }
1076
1077 impl MockSpanExporter {
1078 fn new() -> Self {
1079 Self {
1080 exported_spans: Arc::new(Mutex::new(Vec::new())),
1081 exported_resource: Arc::new(Mutex::new(None)),
1082 }
1083 }
1084 }
1085
1086 impl SpanExporter for MockSpanExporter {
1087 async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
1088 let exported_spans = self.exported_spans.clone();
1089 exported_spans.lock().unwrap().extend(batch);
1090 Ok(())
1091 }
1092
1093 fn shutdown(&mut self) -> OTelSdkResult {
1094 Ok(())
1095 }
1096 fn set_resource(&mut self, resource: &Resource) {
1097 let mut exported_resource = self.exported_resource.lock().unwrap();
1098 *exported_resource = Some(resource.clone());
1099 }
1100 }
1101
1102 #[test]
1103 fn batchspanprocessor_handles_on_end() {
1104 let exporter = MockSpanExporter::new();
1105 let exporter_shared = exporter.exported_spans.clone();
1106 let config = BatchConfigBuilder::default()
1107 .with_max_queue_size(10)
1108 .with_max_export_batch_size(10)
1109 .with_scheduled_delay(Duration::from_secs(5))
1110 .build();
1111 let processor = BatchSpanProcessor::new(exporter, config);
1112
1113 let test_span = create_test_span("test_span");
1114 processor.on_end(test_span.clone());
1115
1116 std::thread::sleep(Duration::from_secs(6));
1118
1119 let exported_spans = exporter_shared.lock().unwrap();
1120 assert_eq!(exported_spans.len(), 1);
1121 assert_eq!(exported_spans[0].name, "test_span");
1122 }
1123
1124 #[test]
1125 fn batchspanprocessor_force_flush() {
1126 let exporter = MockSpanExporter::new();
1127 let exporter_shared = exporter.exported_spans.clone(); let config = BatchConfigBuilder::default()
1129 .with_max_queue_size(10)
1130 .with_max_export_batch_size(10)
1131 .with_scheduled_delay(Duration::from_secs(5))
1132 .build();
1133 let processor = BatchSpanProcessor::new(exporter, config);
1134
1135 let test_span = create_test_span("force_flush_span");
1137 processor.on_end(test_span.clone());
1138
1139 let flush_result = processor.force_flush();
1141 assert!(flush_result.is_ok(), "Force flush failed unexpectedly");
1142
1143 let exported_spans = exporter_shared.lock().unwrap();
1145 assert_eq!(
1146 exported_spans.len(),
1147 1,
1148 "Unexpected number of exported spans"
1149 );
1150 assert_eq!(exported_spans[0].name, "force_flush_span");
1151 }
1152
1153 #[test]
1154 fn batchspanprocessor_shutdown() {
1155 let exporter = MockSpanExporter::new();
1156 let exporter_shared = exporter.exported_spans.clone(); let config = BatchConfigBuilder::default()
1158 .with_max_queue_size(10)
1159 .with_max_export_batch_size(10)
1160 .with_scheduled_delay(Duration::from_secs(5))
1161 .build();
1162 let processor = BatchSpanProcessor::new(exporter, config);
1163
1164 let test_span = create_test_span("shutdown_span");
1166 processor.on_end(test_span.clone());
1167
1168 let shutdown_result = processor.shutdown();
1170 assert!(shutdown_result.is_ok(), "Shutdown failed unexpectedly");
1171
1172 let exported_spans = exporter_shared.lock().unwrap();
1174 assert_eq!(
1175 exported_spans.len(),
1176 1,
1177 "Unexpected number of exported spans"
1178 );
1179 assert_eq!(exported_spans[0].name, "shutdown_span");
1180
1181 let second_shutdown_result = processor.shutdown();
1183 assert!(
1184 second_shutdown_result.is_err(),
1185 "Shutdown should fail when called a second time"
1186 );
1187 }
1188
1189 #[test]
1190 fn batchspanprocessor_handles_dropped_spans() {
1191 let exporter = MockSpanExporter::new();
1192 let exporter_shared = exporter.exported_spans.clone(); let config = BatchConfigBuilder::default()
1194 .with_max_queue_size(2) .with_scheduled_delay(Duration::from_secs(5))
1196 .build();
1197 let processor = BatchSpanProcessor::new(exporter, config);
1198
1199 let span1 = create_test_span("span1");
1201 let span2 = create_test_span("span2");
1202 let span3 = create_test_span("span3"); processor.on_end(span1.clone());
1205 processor.on_end(span2.clone());
1206 processor.on_end(span3.clone()); std::thread::sleep(Duration::from_secs(3));
1210
1211 let exported_spans = exporter_shared.lock().unwrap();
1212
1213 assert_eq!(
1215 exported_spans.len(),
1216 2,
1217 "Unexpected number of exported spans"
1218 );
1219 assert!(exported_spans.iter().any(|s| s.name == "span1"));
1220 assert!(exported_spans.iter().any(|s| s.name == "span2"));
1221
1222 assert!(
1224 !exported_spans.iter().any(|s| s.name == "span3"),
1225 "Span3 should have been dropped"
1226 );
1227
1228 let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed);
1230 assert_eq!(dropped_count, 1, "Unexpected number of dropped spans");
1231 }
1232
1233 #[test]
1234 fn validate_span_attributes_exported_correctly() {
1235 let exporter = MockSpanExporter::new();
1236 let exporter_shared = exporter.exported_spans.clone();
1237 let config = BatchConfigBuilder::default().build();
1238 let processor = BatchSpanProcessor::new(exporter, config);
1239
1240 let mut span_data = create_test_span("attribute_validation");
1242 span_data.attributes = vec![
1243 KeyValue::new("key1", "value1"),
1244 KeyValue::new("key2", "value2"),
1245 ];
1246 processor.on_end(span_data.clone());
1247
1248 let _ = processor.force_flush();
1250
1251 let exported_spans = exporter_shared.lock().unwrap();
1253 assert_eq!(exported_spans.len(), 1);
1254 let exported_span = &exported_spans[0];
1255 assert!(exported_span
1256 .attributes
1257 .contains(&KeyValue::new("key1", "value1")));
1258 assert!(exported_span
1259 .attributes
1260 .contains(&KeyValue::new("key2", "value2")));
1261 }
1262
1263 #[test]
1264 fn batchspanprocessor_sets_and_exports_with_resource() {
1265 let exporter = MockSpanExporter::new();
1266 let exporter_shared = exporter.exported_spans.clone();
1267 let resource_shared = exporter.exported_resource.clone();
1268 let config = BatchConfigBuilder::default().build();
1269 let mut processor = BatchSpanProcessor::new(exporter, config);
1270
1271 let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]);
1273 processor.set_resource(&resource);
1274
1275 let test_span = create_test_span("resource_test");
1277 processor.on_end(test_span.clone());
1278
1279 let _ = processor.force_flush();
1281
1282 let exported_spans = exporter_shared.lock().unwrap();
1284 assert_eq!(exported_spans.len(), 1);
1285
1286 let exported_resource = resource_shared.lock().unwrap();
1288 assert!(exported_resource.is_some());
1289 assert_eq!(
1290 exported_resource
1291 .as_ref()
1292 .unwrap()
1293 .get(&Key::new("service.name")),
1294 Some(Value::from("test_service"))
1295 );
1296 }
1297
1298 #[tokio::test(flavor = "current_thread")]
1299 async fn test_batch_processor_current_thread_runtime() {
1300 let exporter = MockSpanExporter::new();
1301 let exporter_shared = exporter.exported_spans.clone();
1302
1303 let config = BatchConfigBuilder::default()
1304 .with_max_queue_size(5)
1305 .with_max_export_batch_size(3)
1306 .build();
1307
1308 let processor = BatchSpanProcessor::new(exporter, config);
1309
1310 for _ in 0..4 {
1311 let span = new_test_export_span_data();
1312 processor.on_end(span);
1313 }
1314
1315 processor.force_flush().unwrap();
1316
1317 let exported_spans = exporter_shared.lock().unwrap();
1318 assert_eq!(exported_spans.len(), 4);
1319 }
1320
1321 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1322 async fn test_batch_processor_multi_thread_count_1_runtime() {
1323 let exporter = MockSpanExporter::new();
1324 let exporter_shared = exporter.exported_spans.clone();
1325
1326 let config = BatchConfigBuilder::default()
1327 .with_max_queue_size(5)
1328 .with_max_export_batch_size(3)
1329 .build();
1330
1331 let processor = BatchSpanProcessor::new(exporter, config);
1332
1333 for _ in 0..4 {
1334 let span = new_test_export_span_data();
1335 processor.on_end(span);
1336 }
1337
1338 processor.force_flush().unwrap();
1339
1340 let exported_spans = exporter_shared.lock().unwrap();
1341 assert_eq!(exported_spans.len(), 4);
1342 }
1343
1344 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1345 async fn test_batch_processor_multi_thread() {
1346 let exporter = MockSpanExporter::new();
1347 let exporter_shared = exporter.exported_spans.clone();
1348
1349 let config = BatchConfigBuilder::default()
1350 .with_max_queue_size(20)
1351 .with_max_export_batch_size(5)
1352 .build();
1353
1354 let processor = Arc::new(BatchSpanProcessor::new(exporter, config));
1356
1357 let mut handles = vec![];
1358 for _ in 0..10 {
1359 let processor_clone = Arc::clone(&processor);
1360 let handle = tokio::spawn(async move {
1361 let span = new_test_export_span_data();
1362 processor_clone.on_end(span);
1363 });
1364 handles.push(handle);
1365 }
1366
1367 for handle in handles {
1368 handle.await.unwrap();
1369 }
1370
1371 processor.force_flush().unwrap();
1372
1373 let exported_spans = exporter_shared.lock().unwrap();
1375 assert_eq!(exported_spans.len(), 10);
1376 }
1377}