1use super::IdGenerator;
2use crate::error::{OTelSdkError, OTelSdkResult};
3use crate::trace::{
68    BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SdkTracer, SimpleSpanProcessor,
69    SpanLimits,
70};
71use crate::Resource;
72use crate::{trace::SpanExporter, trace::SpanProcessor};
73use opentelemetry::otel_debug;
74use opentelemetry::{otel_info, InstrumentationScope};
75use std::borrow::Cow;
76use std::sync::atomic::{AtomicBool, Ordering};
77use std::sync::{Arc, OnceLock};
78use std::time::Duration;
79
80static PROVIDER_RESOURCE: OnceLock<Resource> = OnceLock::new();
81
82static NOOP_TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();
85#[inline]
86fn noop_tracer_provider() -> &'static SdkTracerProvider {
87    NOOP_TRACER_PROVIDER.get_or_init(|| {
88        SdkTracerProvider {
89            inner: Arc::new(TracerProviderInner {
90                processors: Vec::new(),
91                config: Config {
92                    sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
94                    id_generator: Box::<RandomIdGenerator>::default(),
95                    span_limits: SpanLimits::default(),
96                    resource: Cow::Owned(Resource::empty()),
97                },
98                is_shutdown: AtomicBool::new(true),
99            }),
100        }
101    })
102}
103
104#[derive(Debug)]
106pub(crate) struct TracerProviderInner {
107    processors: Vec<Box<dyn SpanProcessor>>,
108    config: crate::trace::Config,
109    is_shutdown: AtomicBool,
110}
111
112impl TracerProviderInner {
113    pub(crate) fn shutdown_with_timeout(&self, timeout: Duration) -> Vec<OTelSdkResult> {
116        let mut results = vec![];
117        for processor in &self.processors {
118            let result = processor.shutdown_with_timeout(timeout);
119            if let Err(err) = &result {
120                otel_debug!(name: "TracerProvider.Drop.ShutdownError",
125                        error = format!("{err}"));
126            }
127            results.push(result);
128        }
129        results
130    }
131    pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
133        self.shutdown_with_timeout(Duration::from_secs(5))
134    }
135}
136
137impl Drop for TracerProviderInner {
138    fn drop(&mut self) {
139        if !self.is_shutdown.load(Ordering::Relaxed) {
140            let _ = self.shutdown(); } else {
142            otel_debug!(
143                name: "TracerProvider.Drop.AlreadyShutdown",
144                message = "TracerProvider was already shut down; drop will not attempt shutdown again."
145            );
146        }
147    }
148}
149
150#[derive(Clone, Debug)]
158pub struct SdkTracerProvider {
159    inner: Arc<TracerProviderInner>,
160}
161
162impl Default for SdkTracerProvider {
163    fn default() -> Self {
164        SdkTracerProvider::builder().build()
165    }
166}
167
168impl SdkTracerProvider {
169    pub(crate) fn new(inner: TracerProviderInner) -> Self {
171        SdkTracerProvider {
172            inner: Arc::new(inner),
173        }
174    }
175
176    pub fn builder() -> TracerProviderBuilder {
178        TracerProviderBuilder::default()
179    }
180
181    pub(crate) fn span_processors(&self) -> &[Box<dyn SpanProcessor>] {
183        &self.inner.processors
184    }
185
186    pub(crate) fn config(&self) -> &crate::trace::Config {
188        &self.inner.config
189    }
190
191    pub(crate) fn is_shutdown(&self) -> bool {
194        self.inner.is_shutdown.load(Ordering::Relaxed)
195    }
196
197    pub fn force_flush(&self) -> OTelSdkResult {
231        let result: Vec<_> = self
232            .span_processors()
233            .iter()
234            .map(|processor| processor.force_flush())
235            .collect();
236        if result.iter().all(|r| r.is_ok()) {
237            Ok(())
238        } else {
239            Err(OTelSdkError::InternalFailure(format!("errs: {:?}", result)))
240        }
241    }
242
243    pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
247        if self
248            .inner
249            .is_shutdown
250            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
251            .is_ok()
252        {
253            let results = self.inner.shutdown_with_timeout(timeout);
255
256            if results.iter().all(|res| res.is_ok()) {
257                Ok(())
258            } else {
259                Err(OTelSdkError::InternalFailure(format!(
260                    "Shutdown errors: {:?}",
261                    results
262                        .into_iter()
263                        .filter_map(Result::err)
264                        .collect::<Vec<_>>() )))
266            }
267        } else {
268            Err(OTelSdkError::AlreadyShutdown)
269        }
270    }
271
272    pub fn shutdown(&self) -> OTelSdkResult {
274        self.shutdown_with_timeout(Duration::from_secs(5))
275    }
276}
277
278impl opentelemetry::trace::TracerProvider for SdkTracerProvider {
279    type Tracer = SdkTracer;
281
282    fn tracer(&self, name: impl Into<Cow<'static, str>>) -> Self::Tracer {
283        let scope = InstrumentationScope::builder(name).build();
284        self.tracer_with_scope(scope)
285    }
286
287    fn tracer_with_scope(&self, scope: InstrumentationScope) -> Self::Tracer {
288        if self.inner.is_shutdown.load(Ordering::Relaxed) {
289            return SdkTracer::new(scope, noop_tracer_provider().clone());
290        }
291        if scope.name().is_empty() {
292            otel_info!(name: "TracerNameEmpty",  message = "Tracer name is empty; consider providing a meaningful name. Tracer will function normally and the provided name will be used as-is.");
293        };
294        SdkTracer::new(scope, self.clone())
295    }
296}
297
298#[derive(Debug, Default)]
300pub struct TracerProviderBuilder {
301    processors: Vec<Box<dyn SpanProcessor>>,
302    config: crate::trace::Config,
303    resource: Option<Resource>,
304}
305
306impl TracerProviderBuilder {
307    pub fn with_simple_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
319        let simple = SimpleSpanProcessor::new(exporter);
320        self.with_span_processor(simple)
321    }
322
323    pub fn with_batch_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
335        let batch = BatchSpanProcessor::builder(exporter).build();
336        self.with_span_processor(batch)
337    }
338
339    pub fn with_span_processor<T: SpanProcessor + 'static>(self, processor: T) -> Self {
351        let mut processors = self.processors;
352        processors.push(Box::new(processor));
353
354        TracerProviderBuilder { processors, ..self }
355    }
356
357    pub fn with_sampler<T: crate::trace::ShouldSample + 'static>(mut self, sampler: T) -> Self {
359        self.config.sampler = Box::new(sampler);
360        self
361    }
362
363    pub fn with_id_generator<T: IdGenerator + 'static>(mut self, id_generator: T) -> Self {
365        self.config.id_generator = Box::new(id_generator);
366        self
367    }
368
369    pub fn with_max_events_per_span(mut self, max_events: u32) -> Self {
371        self.config.span_limits.max_events_per_span = max_events;
372        self
373    }
374
375    pub fn with_max_attributes_per_span(mut self, max_attributes: u32) -> Self {
377        self.config.span_limits.max_attributes_per_span = max_attributes;
378        self
379    }
380
381    pub fn with_max_links_per_span(mut self, max_links: u32) -> Self {
383        self.config.span_limits.max_links_per_span = max_links;
384        self
385    }
386
387    pub fn with_max_attributes_per_event(mut self, max_attributes: u32) -> Self {
389        self.config.span_limits.max_attributes_per_event = max_attributes;
390        self
391    }
392
393    pub fn with_max_attributes_per_link(mut self, max_attributes: u32) -> Self {
395        self.config.span_limits.max_attributes_per_link = max_attributes;
396        self
397    }
398
399    pub fn with_span_limits(mut self, span_limits: SpanLimits) -> Self {
401        self.config.span_limits = span_limits;
402        self
403    }
404
405    pub fn with_resource(self, resource: Resource) -> Self {
417        let resource = match self.resource {
418            Some(existing) => Some(existing.merge(&resource)),
419            None => Some(resource),
420        };
421
422        TracerProviderBuilder { resource, ..self }
423    }
424
425    pub fn build(self) -> SdkTracerProvider {
427        let mut config = self.config;
428
429        if let Some(resource) = self.resource {
431            config.resource = Cow::Owned(resource);
432        };
433
434        if matches!(config.resource, Cow::Owned(_)) {
441            config.resource =
442                match PROVIDER_RESOURCE.get_or_init(|| config.resource.clone().into_owned()) {
443                    static_resource if *static_resource == *config.resource.as_ref() => {
444                        Cow::Borrowed(static_resource)
445                    }
446                    _ => config.resource, };
448        }
449
450        let mut processors = self.processors;
452
453        for p in &mut processors {
455            p.set_resource(config.resource.as_ref());
456        }
457
458        let is_shutdown = AtomicBool::new(false);
459        SdkTracerProvider::new(TracerProviderInner {
460            processors,
461            config,
462            is_shutdown,
463        })
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use crate::error::{OTelSdkError, OTelSdkResult};
470    use crate::resource::{
471        SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
472    };
473    use crate::trace::provider::TracerProviderInner;
474    use crate::trace::{Config, Span, SpanProcessor};
475    use crate::trace::{SdkTracerProvider, SpanData};
476    use crate::Resource;
477    use opentelemetry::trace::{Tracer, TracerProvider};
478    use opentelemetry::{Context, Key, KeyValue, Value};
479
480    use std::env;
481    use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
482    use std::sync::Arc;
483    use std::time::Duration;
484
485    #[derive(Default, Debug)]
487    struct AssertInfo {
488        started_span: AtomicU32,
489        is_shutdown: AtomicBool,
490    }
491
492    #[derive(Default, Debug, Clone)]
493    struct SharedAssertInfo(Arc<AssertInfo>);
494
495    impl SharedAssertInfo {
496        fn started_span_count(&self, count: u32) -> bool {
497            self.0.started_span.load(Ordering::SeqCst) == count
498        }
499    }
500
501    #[derive(Debug)]
502    struct TestSpanProcessor {
503        success: bool,
504        assert_info: SharedAssertInfo,
505    }
506
507    impl TestSpanProcessor {
508        fn new(success: bool) -> TestSpanProcessor {
509            TestSpanProcessor {
510                success,
511                assert_info: SharedAssertInfo::default(),
512            }
513        }
514
515        fn assert_info(&self) -> SharedAssertInfo {
517            self.assert_info.clone()
518        }
519    }
520
521    impl SpanProcessor for TestSpanProcessor {
522        fn on_start(&self, _span: &mut Span, _cx: &Context) {
523            self.assert_info
524                .0
525                .started_span
526                .fetch_add(1, Ordering::SeqCst);
527        }
528
529        fn on_end(&self, _span: SpanData) {
530            }
532
533        fn force_flush(&self) -> OTelSdkResult {
534            if self.success {
535                Ok(())
536            } else {
537                Err(OTelSdkError::InternalFailure("cannot export".into()))
538            }
539        }
540
541        fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
542            if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
543                Ok(())
544            } else {
545                let _ = self.assert_info.0.is_shutdown.compare_exchange(
546                    false,
547                    true,
548                    Ordering::SeqCst,
549                    Ordering::SeqCst,
550                );
551                self.force_flush()
552            }
553        }
554    }
555
556    #[test]
557    fn test_force_flush() {
558        let tracer_provider = super::SdkTracerProvider::new(TracerProviderInner {
559            processors: vec![
560                Box::from(TestSpanProcessor::new(true)),
561                Box::from(TestSpanProcessor::new(false)),
562            ],
563            config: Default::default(),
564            is_shutdown: AtomicBool::new(false),
565        });
566
567        let results = tracer_provider.force_flush();
568        assert!(results.is_err());
569    }
570
571    #[test]
572    fn test_tracer_provider_default_resource() {
573        let assert_resource = |provider: &super::SdkTracerProvider,
574                               resource_key: &'static str,
575                               expect: Option<&'static str>| {
576            assert_eq!(
577                provider
578                    .config()
579                    .resource
580                    .get(&Key::from_static_str(resource_key))
581                    .map(|v| v.to_string()),
582                expect.map(|s| s.to_string())
583            );
584        };
585        let assert_telemetry_resource = |provider: &super::SdkTracerProvider| {
586            assert_eq!(
587                provider
588                    .config()
589                    .resource
590                    .get(&TELEMETRY_SDK_LANGUAGE.into()),
591                Some(Value::from("rust"))
592            );
593            assert_eq!(
594                provider.config().resource.get(&TELEMETRY_SDK_NAME.into()),
595                Some(Value::from("opentelemetry"))
596            );
597            assert_eq!(
598                provider
599                    .config()
600                    .resource
601                    .get(&TELEMETRY_SDK_VERSION.into()),
602                Some(Value::from(env!("CARGO_PKG_VERSION")))
603            );
604        };
605
606        temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
608            let default_config_provider = super::SdkTracerProvider::builder().build();
609            assert_resource(
610                &default_config_provider,
611                SERVICE_NAME,
612                Some("unknown_service"),
613            );
614            assert_telemetry_resource(&default_config_provider);
615        });
616
617        let custom_config_provider = super::SdkTracerProvider::builder()
619            .with_resource(
620                Resource::builder_empty()
621                    .with_service_name("test_service")
622                    .build(),
623            )
624            .build();
625        assert_resource(&custom_config_provider, SERVICE_NAME, Some("test_service"));
626        assert_eq!(custom_config_provider.config().resource.len(), 1);
627
628        temp_env::with_var(
630            "OTEL_RESOURCE_ATTRIBUTES",
631            Some("key1=value1, k2, k3=value2"),
632            || {
633                let env_resource_provider = super::SdkTracerProvider::builder().build();
634                assert_resource(
635                    &env_resource_provider,
636                    SERVICE_NAME,
637                    Some("unknown_service"),
638                );
639                assert_resource(&env_resource_provider, "key1", Some("value1"));
640                assert_resource(&env_resource_provider, "k3", Some("value2"));
641                assert_telemetry_resource(&env_resource_provider);
642                assert_eq!(env_resource_provider.config().resource.len(), 6);
643            },
644        );
645
646        temp_env::with_var(
648            "OTEL_RESOURCE_ATTRIBUTES",
649            Some("my-custom-key=env-val,k2=value2"),
650            || {
651                let user_provided_resource_config_provider = super::SdkTracerProvider::builder()
652                    .with_resource(
653                        Resource::builder()
654                            .with_attributes([
655                                KeyValue::new("my-custom-key", "my-custom-value"),
656                                KeyValue::new("my-custom-key2", "my-custom-value2"),
657                            ])
658                            .build(),
659                    )
660                    .build();
661                assert_resource(
662                    &user_provided_resource_config_provider,
663                    SERVICE_NAME,
664                    Some("unknown_service"),
665                );
666                assert_resource(
667                    &user_provided_resource_config_provider,
668                    "my-custom-key",
669                    Some("my-custom-value"),
670                );
671                assert_resource(
672                    &user_provided_resource_config_provider,
673                    "my-custom-key2",
674                    Some("my-custom-value2"),
675                );
676                assert_resource(
677                    &user_provided_resource_config_provider,
678                    "k2",
679                    Some("value2"),
680                );
681                assert_telemetry_resource(&user_provided_resource_config_provider);
682                assert_eq!(
683                    user_provided_resource_config_provider
684                        .config()
685                        .resource
686                        .len(),
687                    7
688                );
689            },
690        );
691
692        let no_service_name = super::SdkTracerProvider::builder()
694            .with_resource(Resource::empty())
695            .build();
696
697        assert_eq!(no_service_name.config().resource.len(), 0)
698    }
699
700    #[test]
701    fn test_shutdown_noops() {
702        let processor = TestSpanProcessor::new(false);
703        let assert_handle = processor.assert_info();
704        let tracer_provider = super::SdkTracerProvider::new(TracerProviderInner {
705            processors: vec![Box::from(processor)],
706            config: Default::default(),
707            is_shutdown: AtomicBool::new(false),
708        });
709
710        let test_tracer_1 = tracer_provider.tracer("test1");
711        let _ = test_tracer_1.start("test");
712
713        assert!(assert_handle.started_span_count(1));
714
715        let _ = test_tracer_1.start("test");
716
717        assert!(assert_handle.started_span_count(2));
718
719        let shutdown = |tracer_provider: super::SdkTracerProvider| {
720            let _ = tracer_provider.shutdown(); };
722
723        shutdown(tracer_provider.clone());
725
726        let noop_tracer = tracer_provider.tracer("noop");
728
729        let _ = noop_tracer.start("test");
731        assert!(assert_handle.started_span_count(2));
732        assert!(noop_tracer.provider().is_shutdown());
734
735        let _ = test_tracer_1.start("test");
737        assert!(assert_handle.started_span_count(2));
738
739        assert!(test_tracer_1.provider().is_shutdown());
741    }
742
743    #[test]
744    fn with_resource_multiple_calls_ensure_additive() {
745        let resource = SdkTracerProvider::builder()
746            .with_resource(Resource::new(vec![KeyValue::new("key1", "value1")]))
747            .with_resource(Resource::new(vec![KeyValue::new("key2", "value2")]))
748            .with_resource(
749                Resource::builder_empty()
750                    .with_schema_url(vec![], "http://example.com")
751                    .build(),
752            )
753            .with_resource(Resource::new(vec![KeyValue::new("key3", "value3")]))
754            .build()
755            .inner
756            .config
757            .resource
758            .clone()
759            .into_owned();
760
761        assert_eq!(
762            resource.get(&Key::from_static_str("key1")),
763            Some(Value::from("value1"))
764        );
765        assert_eq!(
766            resource.get(&Key::from_static_str("key2")),
767            Some(Value::from("value2"))
768        );
769        assert_eq!(
770            resource.get(&Key::from_static_str("key3")),
771            Some(Value::from("value3"))
772        );
773        assert_eq!(resource.schema_url(), Some("http://example.com"));
774    }
775
776    #[derive(Debug)]
777    struct CountingShutdownProcessor {
778        shutdown_count: Arc<AtomicU32>,
779    }
780
781    impl CountingShutdownProcessor {
782        fn new(shutdown_count: Arc<AtomicU32>) -> Self {
783            CountingShutdownProcessor { shutdown_count }
784        }
785    }
786
787    impl SpanProcessor for CountingShutdownProcessor {
788        fn on_start(&self, _span: &mut Span, _cx: &Context) {
789            }
791
792        fn on_end(&self, _span: SpanData) {
793            }
795
796        fn force_flush(&self) -> OTelSdkResult {
797            Ok(())
798        }
799
800        fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
801            self.shutdown_count.fetch_add(1, Ordering::SeqCst);
802            Ok(())
803        }
804    }
805
806    #[test]
807    fn drop_test_with_multiple_providers() {
808        let shutdown_count = Arc::new(AtomicU32::new(0));
809
810        {
811            let shared_inner = Arc::new(TracerProviderInner {
813                processors: vec![Box::new(CountingShutdownProcessor::new(
814                    shutdown_count.clone(),
815                ))],
816                config: Config::default(),
817                is_shutdown: AtomicBool::new(false),
818            });
819
820            {
821                let tracer_provider1 = super::SdkTracerProvider {
822                    inner: shared_inner.clone(),
823                };
824                let tracer_provider2 = super::SdkTracerProvider {
825                    inner: shared_inner.clone(),
826                };
827
828                let tracer1 = tracer_provider1.tracer("test-tracer1");
829                let tracer2 = tracer_provider2.tracer("test-tracer2");
830
831                let _span1 = tracer1.start("span1");
832                let _span2 = tracer2.start("span2");
833
834                }
837            assert_eq!(shutdown_count.load(Ordering::SeqCst), 0);
840        }
841        assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
843    }
844
845    #[test]
846    fn drop_after_shutdown_test_with_multiple_providers() {
847        let shutdown_count = Arc::new(AtomicU32::new(0));
848
849        let shared_inner = Arc::new(TracerProviderInner {
851            processors: vec![Box::new(CountingShutdownProcessor::new(
852                shutdown_count.clone(),
853            ))],
854            config: Config::default(),
855            is_shutdown: AtomicBool::new(false),
856        });
857
858        {
860            let tracer_provider1 = super::SdkTracerProvider {
861                inner: shared_inner.clone(),
862            };
863            let tracer_provider2 = super::SdkTracerProvider {
864                inner: shared_inner.clone(),
865            };
866
867            let shutdown_result = tracer_provider1.shutdown();
869            assert!(shutdown_result.is_ok());
870
871            assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
873
874            let shutdown_result2 = tracer_provider2.shutdown();
876            assert!(shutdown_result2.is_err());
877            assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
878
879            }
881
882        assert_eq!(shutdown_count.load(Ordering::SeqCst), 1);
884    }
885}