opentelemetry/context.rs
1//! Execution-scoped context propagation.
2//!
3//! The `context` module provides mechanisms for propagating values across API boundaries and between
4//! logically associated execution units. It enables cross-cutting concerns to access their data in-process
5//! using a shared context object.
6//!
7//! # Main Types
8//!
9//! - [`Context`]: An immutable, execution-scoped collection of values.
10//!
11
12use crate::otel_warn;
13#[cfg(feature = "trace")]
14use crate::trace::context::SynchronizedSpan;
15use std::any::{Any, TypeId};
16use std::cell::RefCell;
17use std::collections::HashMap;
18use std::fmt;
19use std::hash::{BuildHasherDefault, Hasher};
20use std::marker::PhantomData;
21use std::sync::Arc;
22
23#[cfg(feature = "futures")]
24mod future_ext;
25
26#[cfg(feature = "futures")]
27pub use future_ext::{FutureExt, WithContext};
28
29thread_local! {
30 static CURRENT_CONTEXT: RefCell<ContextStack> = RefCell::new(ContextStack::default());
31}
32
33/// An execution-scoped collection of values.
34///
35/// A [`Context`] is a propagation mechanism which carries execution-scoped
36/// values across API boundaries and between logically associated execution
37/// units. Cross-cutting concerns access their data in-process using the same
38/// shared context object.
39///
40/// [`Context`]s are immutable, and their write operations result in the creation
41/// of a new context containing the original values and the new specified values.
42///
43/// ## Context state
44///
45/// Concerns can create and retrieve their local state in the current execution
46/// state represented by a context through the [`get`] and [`with_value`]
47/// methods. It is recommended to use application-specific types when storing new
48/// context values to avoid unintentionally overwriting existing state.
49///
50/// ## Managing the current context
51///
52/// Contexts can be associated with the caller's current execution unit on a
53/// given thread via the [`attach`] method, and previous contexts can be restored
54/// by dropping the returned [`ContextGuard`]. Context can be nested, and will
55/// restore their parent outer context when detached on drop. To access the
56/// values of the context, a snapshot can be created via the [`Context::current`]
57/// method.
58///
59/// [`Context::current`]: Context::current()
60/// [`get`]: Context::get()
61/// [`with_value`]: Context::with_value()
62/// [`attach`]: Context::attach()
63///
64/// # Examples
65///
66/// ```
67/// use opentelemetry::Context;
68///
69/// // Application-specific `a` and `b` values
70/// #[derive(Debug, PartialEq)]
71/// struct ValueA(&'static str);
72/// #[derive(Debug, PartialEq)]
73/// struct ValueB(u64);
74///
75/// let _outer_guard = Context::new().with_value(ValueA("a")).attach();
76///
77/// // Only value a has been set
78/// let current = Context::current();
79/// assert_eq!(current.get::<ValueA>(), Some(&ValueA("a")));
80/// assert_eq!(current.get::<ValueB>(), None);
81///
82/// {
83/// let _inner_guard = Context::current_with_value(ValueB(42)).attach();
84/// // Both values are set in inner context
85/// let current = Context::current();
86/// assert_eq!(current.get::<ValueA>(), Some(&ValueA("a")));
87/// assert_eq!(current.get::<ValueB>(), Some(&ValueB(42)));
88/// }
89///
90/// // Resets to only the `a` value when inner guard is dropped
91/// let current = Context::current();
92/// assert_eq!(current.get::<ValueA>(), Some(&ValueA("a")));
93/// assert_eq!(current.get::<ValueB>(), None);
94/// ```
95#[derive(Clone, Default)]
96pub struct Context {
97 #[cfg(feature = "trace")]
98 pub(crate) span: Option<Arc<SynchronizedSpan>>,
99 entries: Option<Arc<EntryMap>>,
100 suppress_telemetry: bool,
101}
102
103type EntryMap = HashMap<TypeId, Arc<dyn Any + Sync + Send>, BuildHasherDefault<IdHasher>>;
104
105impl Context {
106 /// Creates an empty `Context`.
107 ///
108 /// The context is initially created with a capacity of 0, so it will not
109 /// allocate. Use [`with_value`] to create a new context that has entries.
110 ///
111 /// [`with_value`]: Context::with_value()
112 pub fn new() -> Self {
113 Context::default()
114 }
115
116 /// Returns an immutable snapshot of the current thread's context.
117 ///
118 /// # Examples
119 ///
120 /// ```
121 /// use opentelemetry::Context;
122 ///
123 /// #[derive(Debug, PartialEq)]
124 /// struct ValueA(&'static str);
125 ///
126 /// fn do_work() {
127 /// assert_eq!(Context::current().get(), Some(&ValueA("a")));
128 /// }
129 ///
130 /// let _guard = Context::new().with_value(ValueA("a")).attach();
131 /// do_work()
132 /// ```
133 pub fn current() -> Self {
134 Self::map_current(|cx| cx.clone())
135 }
136
137 /// Applies a function to the current context returning its value.
138 ///
139 /// This can be used to build higher performing algebraic expressions for
140 /// optionally creating a new context without the overhead of cloning the
141 /// current one and dropping it.
142 ///
143 /// Note: This function will panic if you attempt to attach another context
144 /// while the current one is still borrowed.
145 pub fn map_current<T>(f: impl FnOnce(&Context) -> T) -> T {
146 CURRENT_CONTEXT.with(|cx| cx.borrow().map_current_cx(f))
147 }
148
149 /// Returns a clone of the current thread's context with the given value.
150 ///
151 /// This is a more efficient form of `Context::current().with_value(value)`
152 /// as it avoids the intermediate context clone.
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// use opentelemetry::Context;
158 ///
159 /// // Given some value types defined in your application
160 /// #[derive(Debug, PartialEq)]
161 /// struct ValueA(&'static str);
162 /// #[derive(Debug, PartialEq)]
163 /// struct ValueB(u64);
164 ///
165 /// // You can create and attach context with the first value set to "a"
166 /// let _guard = Context::new().with_value(ValueA("a")).attach();
167 ///
168 /// // And create another context based on the fist with a new value
169 /// let all_current_and_b = Context::current_with_value(ValueB(42));
170 ///
171 /// // The second context now contains all the current values and the addition
172 /// assert_eq!(all_current_and_b.get::<ValueA>(), Some(&ValueA("a")));
173 /// assert_eq!(all_current_and_b.get::<ValueB>(), Some(&ValueB(42)));
174 /// ```
175 pub fn current_with_value<T: 'static + Send + Sync>(value: T) -> Self {
176 Self::map_current(|cx| cx.with_value(value))
177 }
178
179 /// Returns a reference to the entry for the corresponding value type.
180 ///
181 /// # Examples
182 ///
183 /// ```
184 /// use opentelemetry::Context;
185 ///
186 /// // Given some value types defined in your application
187 /// #[derive(Debug, PartialEq)]
188 /// struct ValueA(&'static str);
189 /// #[derive(Debug, PartialEq)]
190 /// struct MyUser();
191 ///
192 /// let cx = Context::new().with_value(ValueA("a"));
193 ///
194 /// // Values can be queried by type
195 /// assert_eq!(cx.get::<ValueA>(), Some(&ValueA("a")));
196 ///
197 /// // And return none if not yet set
198 /// assert_eq!(cx.get::<MyUser>(), None);
199 /// ```
200 pub fn get<T: 'static>(&self) -> Option<&T> {
201 self.entries
202 .as_ref()?
203 .get(&TypeId::of::<T>())?
204 .downcast_ref()
205 }
206
207 /// Returns a copy of the context with the new value included.
208 ///
209 /// # Examples
210 ///
211 /// ```
212 /// use opentelemetry::Context;
213 ///
214 /// // Given some value types defined in your application
215 /// #[derive(Debug, PartialEq)]
216 /// struct ValueA(&'static str);
217 /// #[derive(Debug, PartialEq)]
218 /// struct ValueB(u64);
219 ///
220 /// // You can create a context with the first value set to "a"
221 /// let cx_with_a = Context::new().with_value(ValueA("a"));
222 ///
223 /// // And create another context based on the fist with a new value
224 /// let cx_with_a_and_b = cx_with_a.with_value(ValueB(42));
225 ///
226 /// // The first context is still available and unmodified
227 /// assert_eq!(cx_with_a.get::<ValueA>(), Some(&ValueA("a")));
228 /// assert_eq!(cx_with_a.get::<ValueB>(), None);
229 ///
230 /// // The second context now contains both values
231 /// assert_eq!(cx_with_a_and_b.get::<ValueA>(), Some(&ValueA("a")));
232 /// assert_eq!(cx_with_a_and_b.get::<ValueB>(), Some(&ValueB(42)));
233 /// ```
234 pub fn with_value<T: 'static + Send + Sync>(&self, value: T) -> Self {
235 let entries = if let Some(current_entries) = &self.entries {
236 let mut inner_entries = (**current_entries).clone();
237 inner_entries.insert(TypeId::of::<T>(), Arc::new(value));
238 Some(Arc::new(inner_entries))
239 } else {
240 let mut entries = EntryMap::default();
241 entries.insert(TypeId::of::<T>(), Arc::new(value));
242 Some(Arc::new(entries))
243 };
244 Context {
245 entries,
246 #[cfg(feature = "trace")]
247 span: self.span.clone(),
248 suppress_telemetry: self.suppress_telemetry,
249 }
250 }
251
252 /// Replaces the current context on this thread with this context.
253 ///
254 /// Dropping the returned [`ContextGuard`] will reset the current context to the
255 /// previous value.
256 ///
257 ///
258 /// # Examples
259 ///
260 /// ```
261 /// use opentelemetry::Context;
262 ///
263 /// #[derive(Debug, PartialEq)]
264 /// struct ValueA(&'static str);
265 ///
266 /// let my_cx = Context::new().with_value(ValueA("a"));
267 ///
268 /// // Set the current thread context
269 /// let cx_guard = my_cx.attach();
270 /// assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA("a")));
271 ///
272 /// // Drop the guard to restore the previous context
273 /// drop(cx_guard);
274 /// assert_eq!(Context::current().get::<ValueA>(), None);
275 /// ```
276 ///
277 /// Guards do not need to be explicitly dropped:
278 ///
279 /// ```
280 /// use opentelemetry::Context;
281 ///
282 /// #[derive(Debug, PartialEq)]
283 /// struct ValueA(&'static str);
284 ///
285 /// fn my_function() -> String {
286 /// // attach a context the duration of this function.
287 /// let my_cx = Context::new().with_value(ValueA("a"));
288 /// // NOTE: a variable name after the underscore is **required** or rust
289 /// // will drop the guard, restoring the previous context _immediately_.
290 /// let _guard = my_cx.attach();
291 ///
292 /// // anything happening in functions we call can still access my_cx...
293 /// my_other_function();
294 ///
295 /// // returning from the function drops the guard, exiting the span.
296 /// return "Hello world".to_owned();
297 /// }
298 ///
299 /// fn my_other_function() {
300 /// // ...
301 /// }
302 /// ```
303 /// Sub-scopes may be created to limit the duration for which the span is
304 /// entered:
305 ///
306 /// ```
307 /// use opentelemetry::Context;
308 ///
309 /// #[derive(Debug, PartialEq)]
310 /// struct ValueA(&'static str);
311 ///
312 /// let my_cx = Context::new().with_value(ValueA("a"));
313 ///
314 /// {
315 /// let _guard = my_cx.attach();
316 ///
317 /// // the current context can access variables in
318 /// assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA("a")));
319 ///
320 /// // exiting the scope drops the guard, detaching the context.
321 /// }
322 ///
323 /// // this is back in the default empty context
324 /// assert_eq!(Context::current().get::<ValueA>(), None);
325 /// ```
326 pub fn attach(self) -> ContextGuard {
327 let cx_id = CURRENT_CONTEXT.with(|cx| cx.borrow_mut().push(self));
328
329 ContextGuard {
330 cx_pos: cx_id,
331 _marker: PhantomData,
332 }
333 }
334
335 /// Returns whether telemetry is suppressed in this context.
336 #[inline]
337 pub fn is_telemetry_suppressed(&self) -> bool {
338 self.suppress_telemetry
339 }
340
341 /// Returns a new context with telemetry suppression enabled.
342 pub fn with_telemetry_suppressed(&self) -> Self {
343 Context {
344 entries: self.entries.clone(),
345 #[cfg(feature = "trace")]
346 span: self.span.clone(),
347 suppress_telemetry: true,
348 }
349 }
350
351 /// Enters a scope where telemetry is suppressed.
352 ///
353 /// This method is specifically designed for OpenTelemetry components (like Exporters,
354 /// Processors etc.) to prevent generating recursive or self-referential
355 /// telemetry data when performing their own operations.
356 ///
357 /// Without suppression, we have a telemetry-induced-telemetry situation
358 /// where, operations like exporting telemetry could generate new telemetry
359 /// about the export process itself, potentially causing:
360 /// - Infinite telemetry feedback loops
361 /// - Excessive resource consumption
362 ///
363 /// This method:
364 /// 1. Takes the current context
365 /// 2. Creates a new context from current, with `suppress_telemetry` set to `true`
366 /// 3. Attaches it to the current thread
367 /// 4. Returns a guard that restores the previous context when dropped
368 ///
369 /// OTel SDK components would check `is_current_telemetry_suppressed()` before
370 /// generating new telemetry, but not end users.
371 ///
372 /// # Examples
373 ///
374 /// ```
375 /// use opentelemetry::Context;
376 ///
377 /// // Example: Inside an exporter's implementation
378 /// fn example_export_function() {
379 /// // Prevent telemetry-generating operations from creating more telemetry
380 /// let _guard = Context::enter_telemetry_suppressed_scope();
381 ///
382 /// // Verify suppression is active
383 /// assert_eq!(Context::is_current_telemetry_suppressed(), true);
384 ///
385 /// // Here you would normally perform operations that might generate telemetry
386 /// // but now they won't because the context has suppression enabled
387 /// }
388 ///
389 /// // Demonstrate the function
390 /// example_export_function();
391 /// ```
392 pub fn enter_telemetry_suppressed_scope() -> ContextGuard {
393 Self::map_current(|cx| cx.with_telemetry_suppressed()).attach()
394 }
395
396 /// Returns whether telemetry is suppressed in the current context.
397 ///
398 /// This method is used by OpenTelemetry components to determine whether they should
399 /// generate new telemetry in the current execution context. It provides a performant
400 /// way to check the suppression state.
401 ///
402 /// End-users generally should not use this method directly, as it is primarily intended for
403 /// OpenTelemetry SDK components.
404 ///
405 ///
406 #[inline]
407 pub fn is_current_telemetry_suppressed() -> bool {
408 Self::map_current(|cx| cx.is_telemetry_suppressed())
409 }
410
411 #[cfg(feature = "trace")]
412 pub(crate) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self {
413 Self::map_current(|cx| Context {
414 span: Some(Arc::new(value)),
415 entries: cx.entries.clone(),
416 suppress_telemetry: cx.suppress_telemetry,
417 })
418 }
419
420 #[cfg(feature = "trace")]
421 pub(crate) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self {
422 Context {
423 span: Some(Arc::new(value)),
424 entries: self.entries.clone(),
425 suppress_telemetry: self.suppress_telemetry,
426 }
427 }
428}
429
430impl fmt::Debug for Context {
431 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432 let mut dbg = f.debug_struct("Context");
433
434 #[cfg(feature = "trace")]
435 let mut entries = self.entries.as_ref().map_or(0, |e| e.len());
436 #[cfg(feature = "trace")]
437 {
438 if let Some(span) = &self.span {
439 dbg.field("span", &span.span_context());
440 entries += 1;
441 } else {
442 dbg.field("span", &"None");
443 }
444 }
445 #[cfg(not(feature = "trace"))]
446 let entries = self.entries.as_ref().map_or(0, |e| e.len());
447
448 dbg.field("entries count", &entries)
449 .field("suppress_telemetry", &self.suppress_telemetry)
450 .finish()
451 }
452}
453
454/// A guard that resets the current context to the prior context when dropped.
455#[derive(Debug)]
456pub struct ContextGuard {
457 // The position of the context in the stack. This is used to pop the context.
458 cx_pos: u16,
459 // Ensure this type is !Send as it relies on thread locals
460 _marker: PhantomData<*const ()>,
461}
462
463impl Drop for ContextGuard {
464 fn drop(&mut self) {
465 let id = self.cx_pos;
466 if id > ContextStack::BASE_POS && id < ContextStack::MAX_POS {
467 CURRENT_CONTEXT.with(|context_stack| context_stack.borrow_mut().pop_id(id));
468 }
469 }
470}
471
472/// With TypeIds as keys, there's no need to hash them. They are already hashes
473/// themselves, coming from the compiler. The IdHasher holds the u64 of
474/// the TypeId, and then returns it, instead of doing any bit fiddling.
475#[derive(Clone, Default, Debug)]
476struct IdHasher(u64);
477
478impl Hasher for IdHasher {
479 fn write(&mut self, _: &[u8]) {
480 unreachable!("TypeId calls write_u64");
481 }
482
483 #[inline]
484 fn write_u64(&mut self, id: u64) {
485 self.0 = id;
486 }
487
488 #[inline]
489 fn finish(&self) -> u64 {
490 self.0
491 }
492}
493
494/// A stack for keeping track of the [`Context`] instances that have been attached
495/// to a thread.
496///
497/// The stack allows for popping of contexts by position, which is used to do out
498/// of order dropping of [`ContextGuard`] instances. Only when the top of the
499/// stack is popped, the topmost [`Context`] is actually restored.
500///
501/// The stack relies on the fact that it is thread local and that the
502/// [`ContextGuard`] instances that are constructed using ids from it can't be
503/// moved to other threads. That means that the ids are always valid and that
504/// they are always within the bounds of the stack.
505struct ContextStack {
506 /// This is the current [`Context`] that is active on this thread, and the top
507 /// of the [`ContextStack`]. It is always present, and if the `stack` is empty
508 /// it's an empty [`Context`].
509 ///
510 /// Having this here allows for fast access to the current [`Context`].
511 current_cx: Context,
512 /// A `stack` of the other contexts that have been attached to the thread.
513 stack: Vec<Option<Context>>,
514 /// Ensure this type is !Send as it relies on thread locals
515 _marker: PhantomData<*const ()>,
516}
517
518impl ContextStack {
519 const BASE_POS: u16 = 0;
520 const MAX_POS: u16 = u16::MAX;
521 const INITIAL_CAPACITY: usize = 8;
522
523 #[inline(always)]
524 fn push(&mut self, cx: Context) -> u16 {
525 // The next id is the length of the `stack`, plus one since we have the
526 // top of the [`ContextStack`] as the `current_cx`.
527 let next_id = self.stack.len() + 1;
528 if next_id < ContextStack::MAX_POS.into() {
529 let current_cx = std::mem::replace(&mut self.current_cx, cx);
530 self.stack.push(Some(current_cx));
531 next_id as u16
532 } else {
533 // This is an overflow, log it and ignore it.
534 otel_warn!(
535 name: "Context.AttachFailed",
536 message = format!("Too many contexts. Max limit is {}. \
537 Context::current() remains unchanged as this attach failed. \
538 Dropping the returned ContextGuard will have no impact on Context::current().",
539 ContextStack::MAX_POS)
540 );
541 ContextStack::MAX_POS
542 }
543 }
544
545 #[inline(always)]
546 fn pop_id(&mut self, pos: u16) {
547 if pos == ContextStack::BASE_POS || pos == ContextStack::MAX_POS {
548 // The empty context is always at the bottom of the [`ContextStack`]
549 // and cannot be popped, and the overflow position is invalid, so do
550 // nothing.
551 otel_warn!(
552 name: "Context.OutOfOrderDrop",
553 position = pos,
554 message = if pos == ContextStack::BASE_POS {
555 "Attempted to pop the base context which is not allowed"
556 } else {
557 "Attempted to pop the overflow position which is not allowed"
558 }
559 );
560 return;
561 }
562 let len: u16 = self.stack.len() as u16;
563 // Are we at the top of the [`ContextStack`]?
564 if pos == len {
565 // Shrink the stack if possible to clear out any out of order pops.
566 while let Some(None) = self.stack.last() {
567 _ = self.stack.pop();
568 }
569 // Restore the previous context. This will always happen since the
570 // empty context is always at the bottom of the stack if the
571 // [`ContextStack`] is not empty.
572 if let Some(Some(next_cx)) = self.stack.pop() {
573 self.current_cx = next_cx;
574 }
575 } else {
576 // This is an out of order pop.
577 if pos >= len {
578 // This is an invalid id, ignore it.
579 otel_warn!(
580 name: "Context.PopOutOfBounds",
581 position = pos,
582 stack_length = len,
583 message = "Attempted to pop beyond the end of the context stack"
584 );
585 return;
586 }
587 // Clear out the entry at the given id.
588 _ = self.stack[pos as usize].take();
589 }
590 }
591
592 #[inline(always)]
593 fn map_current_cx<T>(&self, f: impl FnOnce(&Context) -> T) -> T {
594 f(&self.current_cx)
595 }
596}
597
598impl Default for ContextStack {
599 fn default() -> Self {
600 ContextStack {
601 current_cx: Context::default(),
602 stack: Vec::with_capacity(ContextStack::INITIAL_CAPACITY),
603 _marker: PhantomData,
604 }
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611 use std::time::Duration;
612 use tokio::time::sleep;
613
614 #[derive(Debug, PartialEq)]
615 struct ValueA(u64);
616 #[derive(Debug, PartialEq)]
617 struct ValueB(u64);
618
619 #[test]
620 fn context_immutable() {
621 // start with Current, which should be an empty context
622 let cx = Context::current();
623 assert_eq!(cx.get::<ValueA>(), None);
624 assert_eq!(cx.get::<ValueB>(), None);
625
626 // with_value should return a new context,
627 // leaving the original context unchanged
628 let cx_new = cx.with_value(ValueA(1));
629
630 // cx should be unchanged
631 assert_eq!(cx.get::<ValueA>(), None);
632 assert_eq!(cx.get::<ValueB>(), None);
633
634 // cx_new should contain the new value
635 assert_eq!(cx_new.get::<ValueA>(), Some(&ValueA(1)));
636
637 // cx_new should be unchanged
638 let cx_newer = cx_new.with_value(ValueB(1));
639
640 // Cx and cx_new are unchanged
641 assert_eq!(cx.get::<ValueA>(), None);
642 assert_eq!(cx.get::<ValueB>(), None);
643 assert_eq!(cx_new.get::<ValueA>(), Some(&ValueA(1)));
644 assert_eq!(cx_new.get::<ValueB>(), None);
645
646 // cx_newer should contain both values
647 assert_eq!(cx_newer.get::<ValueA>(), Some(&ValueA(1)));
648 assert_eq!(cx_newer.get::<ValueB>(), Some(&ValueB(1)));
649 }
650
651 #[test]
652 fn nested_contexts() {
653 let _outer_guard = Context::new().with_value(ValueA(1)).attach();
654
655 // Only value `a` is set
656 let current = Context::current();
657 assert_eq!(current.get(), Some(&ValueA(1)));
658 assert_eq!(current.get::<ValueB>(), None);
659
660 {
661 let _inner_guard = Context::current_with_value(ValueB(42)).attach();
662 // Both values are set in inner context
663 let current = Context::current();
664 assert_eq!(current.get(), Some(&ValueA(1)));
665 assert_eq!(current.get(), Some(&ValueB(42)));
666
667 assert!(Context::map_current(|cx| {
668 assert_eq!(cx.get(), Some(&ValueA(1)));
669 assert_eq!(cx.get(), Some(&ValueB(42)));
670 true
671 }));
672 }
673
674 // Resets to only value `a` when inner guard is dropped
675 let current = Context::current();
676 assert_eq!(current.get(), Some(&ValueA(1)));
677 assert_eq!(current.get::<ValueB>(), None);
678
679 assert!(Context::map_current(|cx| {
680 assert_eq!(cx.get(), Some(&ValueA(1)));
681 assert_eq!(cx.get::<ValueB>(), None);
682 true
683 }));
684 }
685
686 #[test]
687 fn overlapping_contexts() {
688 let outer_guard = Context::new().with_value(ValueA(1)).attach();
689
690 // Only value `a` is set
691 let current = Context::current();
692 assert_eq!(current.get(), Some(&ValueA(1)));
693 assert_eq!(current.get::<ValueB>(), None);
694
695 let inner_guard = Context::current_with_value(ValueB(42)).attach();
696 // Both values are set in inner context
697 let current = Context::current();
698 assert_eq!(current.get(), Some(&ValueA(1)));
699 assert_eq!(current.get(), Some(&ValueB(42)));
700
701 assert!(Context::map_current(|cx| {
702 assert_eq!(cx.get(), Some(&ValueA(1)));
703 assert_eq!(cx.get(), Some(&ValueB(42)));
704 true
705 }));
706
707 drop(outer_guard);
708
709 // `inner_guard` is still alive so both `ValueA` and `ValueB` should still be accessible
710 let current = Context::current();
711 assert_eq!(current.get(), Some(&ValueA(1)));
712 assert_eq!(current.get(), Some(&ValueB(42)));
713
714 drop(inner_guard);
715
716 // Both guards are dropped and neither value should be accessible.
717 let current = Context::current();
718 assert_eq!(current.get::<ValueA>(), None);
719 assert_eq!(current.get::<ValueB>(), None);
720 }
721
722 #[test]
723 fn too_many_contexts() {
724 let mut guards: Vec<ContextGuard> = Vec::with_capacity(ContextStack::MAX_POS as usize);
725 let stack_max_pos = ContextStack::MAX_POS as u64;
726 // Fill the stack up until the last position
727 for i in 1..stack_max_pos {
728 let cx_guard = Context::current().with_value(ValueB(i)).attach();
729 assert_eq!(Context::current().get(), Some(&ValueB(i)));
730 assert_eq!(cx_guard.cx_pos, i as u16);
731 guards.push(cx_guard);
732 }
733 // Let's overflow the stack a couple of times
734 for _ in 0..16 {
735 let cx_guard = Context::current().with_value(ValueA(1)).attach();
736 assert_eq!(cx_guard.cx_pos, ContextStack::MAX_POS);
737 assert_eq!(Context::current().get::<ValueA>(), None);
738 assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 1)));
739 guards.push(cx_guard);
740 }
741 // Drop the overflow contexts
742 for _ in 0..16 {
743 guards.pop();
744 assert_eq!(Context::current().get::<ValueA>(), None);
745 assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 1)));
746 }
747 // Drop one more so we can add a new one
748 guards.pop();
749 assert_eq!(Context::current().get::<ValueA>(), None);
750 assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 2)));
751 // Push a new context and see that it works
752 let cx_guard = Context::current().with_value(ValueA(2)).attach();
753 assert_eq!(cx_guard.cx_pos, ContextStack::MAX_POS - 1);
754 assert_eq!(Context::current().get(), Some(&ValueA(2)));
755 assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 2)));
756 guards.push(cx_guard);
757 // Let's overflow the stack a couple of times again
758 for _ in 0..16 {
759 let cx_guard = Context::current().with_value(ValueA(1)).attach();
760 assert_eq!(cx_guard.cx_pos, ContextStack::MAX_POS);
761 assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(2)));
762 assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 2)));
763 guards.push(cx_guard);
764 }
765 }
766
767 /// Tests that a new ContextStack is created with the correct initial capacity.
768 #[test]
769 fn test_initial_capacity() {
770 let stack = ContextStack::default();
771 assert_eq!(stack.stack.capacity(), ContextStack::INITIAL_CAPACITY);
772 }
773
774 /// Tests that map_current_cx correctly accesses the current context.
775 #[test]
776 fn test_map_current_cx() {
777 let mut stack = ContextStack::default();
778 let test_value = ValueA(42);
779 stack.current_cx = Context::new().with_value(test_value);
780
781 let result = stack.map_current_cx(|cx| {
782 assert_eq!(cx.get::<ValueA>(), Some(&ValueA(42)));
783 true
784 });
785 assert!(result);
786 }
787
788 /// Tests popping contexts in non-sequential order.
789 #[test]
790 fn test_pop_id_out_of_order() {
791 let mut stack = ContextStack::default();
792
793 // Push three contexts
794 let cx1 = Context::new().with_value(ValueA(1));
795 let cx2 = Context::new().with_value(ValueA(2));
796 let cx3 = Context::new().with_value(ValueA(3));
797
798 let id1 = stack.push(cx1);
799 let id2 = stack.push(cx2);
800 let id3 = stack.push(cx3);
801
802 // Pop middle context first - should not affect current context
803 stack.pop_id(id2);
804 assert_eq!(stack.current_cx.get::<ValueA>(), Some(&ValueA(3)));
805 assert_eq!(stack.stack.len(), 3); // Length unchanged for middle pops
806
807 // Pop last context - should restore previous valid context
808 stack.pop_id(id3);
809 assert_eq!(stack.current_cx.get::<ValueA>(), Some(&ValueA(1)));
810 assert_eq!(stack.stack.len(), 1);
811
812 // Pop first context - should restore to empty state
813 stack.pop_id(id1);
814 assert_eq!(stack.current_cx.get::<ValueA>(), None);
815 assert_eq!(stack.stack.len(), 0);
816 }
817
818 /// Tests edge cases in context stack operations. IRL these should log
819 /// warnings, and definitely not panic.
820 #[test]
821 fn test_pop_id_edge_cases() {
822 let mut stack = ContextStack::default();
823
824 // Test popping BASE_POS - should be no-op
825 stack.pop_id(ContextStack::BASE_POS);
826 assert_eq!(stack.stack.len(), 0);
827
828 // Test popping MAX_POS - should be no-op
829 stack.pop_id(ContextStack::MAX_POS);
830 assert_eq!(stack.stack.len(), 0);
831
832 // Test popping invalid position - should be no-op
833 stack.pop_id(1000);
834 assert_eq!(stack.stack.len(), 0);
835
836 // Test popping from empty stack - should be safe
837 stack.pop_id(1);
838 assert_eq!(stack.stack.len(), 0);
839 }
840
841 /// Tests stack behavior when reaching maximum capacity.
842 /// Once we push beyond this point, we should end up with a context
843 /// that points _somewhere_, but mutating it should not affect the current
844 /// active context.
845 #[test]
846 fn test_push_overflow() {
847 let mut stack = ContextStack::default();
848 let max_pos = ContextStack::MAX_POS as usize;
849
850 // Fill stack up to max position
851 for i in 0..max_pos {
852 let cx = Context::new().with_value(ValueA(i as u64));
853 let id = stack.push(cx);
854 assert_eq!(id, (i + 1) as u16);
855 }
856
857 // Try to push beyond capacity
858 let cx = Context::new().with_value(ValueA(max_pos as u64));
859 let id = stack.push(cx);
860 assert_eq!(id, ContextStack::MAX_POS);
861
862 // Verify current context remains unchanged after overflow
863 assert_eq!(
864 stack.current_cx.get::<ValueA>(),
865 Some(&ValueA((max_pos - 2) as u64))
866 );
867 }
868
869 /// Tests that:
870 /// 1. Parent context values are properly propagated to async operations
871 /// 2. Values added during async operations do not affect parent context
872 #[tokio::test]
873 async fn test_async_context_propagation() {
874 // A nested async operation we'll use to test propagation
875 async fn nested_operation() {
876 // Verify we can see the parent context's value
877 assert_eq!(
878 Context::current().get::<ValueA>(),
879 Some(&ValueA(42)),
880 "Parent context value should be available in async operation"
881 );
882
883 // Create new context
884 let cx_with_both = Context::current()
885 .with_value(ValueA(43)) // override ValueA
886 .with_value(ValueB(24)); // Add new ValueB
887
888 // Run nested async operation with both values
889 async {
890 // Verify both values are available
891 assert_eq!(
892 Context::current().get::<ValueA>(),
893 Some(&ValueA(43)),
894 "Parent value should still be available after adding new value"
895 );
896 assert_eq!(
897 Context::current().get::<ValueB>(),
898 Some(&ValueB(24)),
899 "New value should be available in async operation"
900 );
901
902 // Do some async work to simulate real-world scenario
903 sleep(Duration::from_millis(10)).await;
904
905 // Values should still be available after async work
906 assert_eq!(
907 Context::current().get::<ValueA>(),
908 Some(&ValueA(43)),
909 "Parent value should persist across await points"
910 );
911 assert_eq!(
912 Context::current().get::<ValueB>(),
913 Some(&ValueB(24)),
914 "New value should persist across await points"
915 );
916 }
917 .with_context(cx_with_both)
918 .await;
919 }
920
921 // Set up initial context with ValueA
922 let parent_cx = Context::new().with_value(ValueA(42));
923
924 // Create and run async operation with the parent context explicitly propagated
925 nested_operation().with_context(parent_cx.clone()).await;
926
927 // After async operation completes:
928 // 1. Parent context should be unchanged
929 assert_eq!(
930 parent_cx.get::<ValueA>(),
931 Some(&ValueA(42)),
932 "Parent context should be unchanged"
933 );
934 assert_eq!(
935 parent_cx.get::<ValueB>(),
936 None,
937 "Parent context should not see values added in async operation"
938 );
939
940 // 2. Current context should be back to default
941 assert_eq!(
942 Context::current().get::<ValueA>(),
943 None,
944 "Current context should be back to default"
945 );
946 assert_eq!(
947 Context::current().get::<ValueB>(),
948 None,
949 "Current context should not have async operation's values"
950 );
951 }
952
953 ///
954 /// Tests that unnatural parent->child relationships in nested async
955 /// operations behave properly.
956 ///
957 #[tokio::test]
958 async fn test_out_of_order_context_detachment_futures() {
959 // This function returns a future, but doesn't await it
960 // It will complete before the future that it creates.
961 async fn create_a_future() -> impl std::future::Future<Output = ()> {
962 // Create a future that will do some work, referencing our current
963 // context, but don't await it.
964 async {
965 assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(42)));
966
967 // Longer work
968 sleep(Duration::from_millis(50)).await;
969 }
970 .with_context(Context::current())
971 }
972
973 // Create our base context
974 let parent_cx = Context::new().with_value(ValueA(42));
975
976 // await our nested function, which will create and detach a context
977 let future = create_a_future().with_context(parent_cx).await;
978
979 // Execute the future. The future that created it is long gone, but this shouldn't
980 // cause issues.
981 let _a = future.await;
982
983 // Nothing terrible (e.g., panics!) should happen, and we should definitely not have any
984 // values attached to our current context that were set in the nested operations.
985 assert_eq!(Context::current().get::<ValueA>(), None);
986 assert_eq!(Context::current().get::<ValueB>(), None);
987 }
988
989 #[test]
990 fn test_is_telemetry_suppressed() {
991 // Default context has suppression disabled
992 let cx = Context::new();
993 assert!(!cx.is_telemetry_suppressed());
994
995 // With suppression enabled
996 let suppressed = cx.with_telemetry_suppressed();
997 assert!(suppressed.is_telemetry_suppressed());
998 }
999
1000 #[test]
1001 fn test_with_telemetry_suppressed() {
1002 // Start with a normal context
1003 let cx = Context::new();
1004 assert!(!cx.is_telemetry_suppressed());
1005
1006 // Create a suppressed context
1007 let suppressed = cx.with_telemetry_suppressed();
1008
1009 // Original should remain unchanged
1010 assert!(!cx.is_telemetry_suppressed());
1011
1012 // New context should be suppressed
1013 assert!(suppressed.is_telemetry_suppressed());
1014
1015 // Test with values to ensure they're preserved
1016 let cx_with_value = cx.with_value(ValueA(42));
1017 let suppressed_with_value = cx_with_value.with_telemetry_suppressed();
1018
1019 assert!(!cx_with_value.is_telemetry_suppressed());
1020 assert!(suppressed_with_value.is_telemetry_suppressed());
1021 assert_eq!(suppressed_with_value.get::<ValueA>(), Some(&ValueA(42)));
1022 }
1023
1024 #[test]
1025 fn test_enter_telemetry_suppressed_scope() {
1026 // Ensure we start with a clean context
1027 let _reset_guard = Context::new().attach();
1028
1029 // Default context should not be suppressed
1030 assert!(!Context::is_current_telemetry_suppressed());
1031
1032 // Add an entry to the current context
1033 let cx_with_value = Context::current().with_value(ValueA(42));
1034 let _guard_with_value = cx_with_value.attach();
1035
1036 // Verify the entry is present and context is not suppressed
1037 assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(42)));
1038 assert!(!Context::is_current_telemetry_suppressed());
1039
1040 // Enter a suppressed scope
1041 {
1042 let _guard = Context::enter_telemetry_suppressed_scope();
1043
1044 // Verify suppression is active and the entry is still present
1045 assert!(Context::is_current_telemetry_suppressed());
1046 assert!(Context::current().is_telemetry_suppressed());
1047 assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(42)));
1048 }
1049
1050 // After guard is dropped, should be back to unsuppressed and entry should still be present
1051 assert!(!Context::is_current_telemetry_suppressed());
1052 assert!(!Context::current().is_telemetry_suppressed());
1053 assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(42)));
1054 }
1055
1056 #[test]
1057 fn test_nested_suppression_scopes() {
1058 // Ensure we start with a clean context
1059 let _reset_guard = Context::new().attach();
1060
1061 // Default context should not be suppressed
1062 assert!(!Context::is_current_telemetry_suppressed());
1063
1064 // First level suppression
1065 {
1066 let _outer = Context::enter_telemetry_suppressed_scope();
1067 assert!(Context::is_current_telemetry_suppressed());
1068
1069 // Second level. This component is unaware of Suppression,
1070 // and just attaches a new context. Since it is from current,
1071 // it'll already have suppression enabled.
1072 {
1073 let _inner = Context::current().with_value(ValueA(1)).attach();
1074 assert!(Context::is_current_telemetry_suppressed());
1075 assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(1)));
1076 }
1077
1078 // Another scenario. This component is unaware of Suppression,
1079 // and just attaches a new context, not from Current. Since it is
1080 // not from current it will not have suppression enabled.
1081 {
1082 let _inner = Context::new().with_value(ValueA(1)).attach();
1083 assert!(!Context::is_current_telemetry_suppressed());
1084 assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(1)));
1085 }
1086
1087 // Still suppressed after inner scope
1088 assert!(Context::is_current_telemetry_suppressed());
1089 }
1090
1091 // Back to unsuppressed
1092 assert!(!Context::is_current_telemetry_suppressed());
1093 }
1094
1095 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1096 async fn test_async_suppression() {
1097 async fn nested_operation() {
1098 assert!(Context::is_current_telemetry_suppressed());
1099
1100 let cx_with_additional_value = Context::current().with_value(ValueB(24));
1101
1102 async {
1103 assert_eq!(
1104 Context::current().get::<ValueB>(),
1105 Some(&ValueB(24)),
1106 "Parent value should still be available after adding new value"
1107 );
1108 assert!(Context::is_current_telemetry_suppressed());
1109
1110 // Do some async work to simulate real-world scenario
1111 sleep(Duration::from_millis(10)).await;
1112
1113 // Values should still be available after async work
1114 assert_eq!(
1115 Context::current().get::<ValueB>(),
1116 Some(&ValueB(24)),
1117 "Parent value should still be available after adding new value"
1118 );
1119 assert!(Context::is_current_telemetry_suppressed());
1120 }
1121 .with_context(cx_with_additional_value)
1122 .await;
1123 }
1124
1125 // Set up suppressed context, but don't attach it to current
1126 let suppressed_parent = Context::new().with_telemetry_suppressed();
1127 // Current should not be suppressed as we haven't attached it
1128 assert!(!Context::is_current_telemetry_suppressed());
1129
1130 // Create and run async operation with the suppressed context explicitly propagated
1131 nested_operation()
1132 .with_context(suppressed_parent.clone())
1133 .await;
1134
1135 // After async operation completes:
1136 // Suppression should be active
1137 assert!(suppressed_parent.is_telemetry_suppressed());
1138
1139 // Current should still be not suppressed
1140 assert!(!Context::is_current_telemetry_suppressed());
1141 }
1142}