linera_rpc/
propagation.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! OpenTelemetry context propagation for gRPC.
5//!
6//! This module provides utilities for propagating OpenTelemetry context (trace context and baggage)
7//! across gRPC service boundaries using tonic metadata.
8//!
9//! # Usage
10//!
11//! ## Client-side injection
12//!
13//! ```ignore
14//! use linera_rpc::propagation::inject_context;
15//! use opentelemetry::Context;
16//!
17//! let mut request = tonic::Request::new(payload);
18//! inject_context(&Context::current(), request.metadata_mut());
19//! ```
20//!
21//! ## Server-side extraction
22//!
23//! ```ignore
24//! use linera_rpc::propagation::extract_context;
25//!
26//! let cx = extract_context(request.metadata());
27//! // Use cx.with_baggage() to access baggage values
28//! ```
29
30use std::task::{Context as TaskContext, Poll};
31
32use futures::{future::BoxFuture, FutureExt};
33use opentelemetry::{
34    global,
35    propagation::{Extractor, Injector},
36    Context,
37};
38use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
39use tower::{Layer, Service};
40use tracing::warn;
41
42/// Baggage key for traffic type labeling.
43///
44/// Used to distinguish organic traffic from synthetic (benchmark) traffic.
45/// Valid values are "organic" and "synthetic".
46pub const TRAFFIC_TYPE_KEY: &str = "traffic_type";
47
48/// Traffic type for normal production traffic.
49pub const TRAFFIC_TYPE_ORGANIC: &str = "organic";
50
51/// Traffic type for synthetic benchmark traffic.
52pub const TRAFFIC_TYPE_SYNTHETIC: &str = "synthetic";
53
54/// Traffic type when OpenTelemetry feature is disabled.
55pub const TRAFFIC_TYPE_UNKNOWN: &str = "unknown";
56
57/// Environment variable to override the traffic type.
58///
59/// Set this to "synthetic" to mark all outgoing requests as benchmark traffic.
60/// This is useful for benchmark tools that cannot easily set OpenTelemetry baggage.
61pub const TRAFFIC_TYPE_ENV_VAR: &str = "LINERA_TRAFFIC_TYPE";
62
63/// Tower layer that extracts OpenTelemetry context from incoming gRPC requests.
64///
65/// This layer extracts W3C TraceContext and Baggage headers from the request
66/// metadata and stores the extracted context in the request extensions.
67///
68/// # Usage
69///
70/// ```ignore
71/// use linera_rpc::propagation::OtelContextLayer;
72/// use tower::ServiceBuilder;
73///
74/// let service = ServiceBuilder::new()
75///     .layer(OtelContextLayer)
76///     .service(my_service);
77/// ```
78#[derive(Clone, Copy, Debug, Default)]
79pub struct OtelContextLayer;
80
81/// Service wrapper that extracts OpenTelemetry context from requests.
82#[derive(Clone, Debug)]
83pub struct OtelContextService<S> {
84    inner: S,
85}
86
87/// Extension type to store the extracted OpenTelemetry context.
88#[derive(Clone, Debug)]
89pub struct ExtractedOtelContext(pub Context);
90
91/// Trait for request types that can provide access to extracted OpenTelemetry context.
92///
93/// This trait abstracts over `http::Request` and `tonic::Request` to allow
94/// generic functions that work with either request type.
95pub trait HasOtelContext {
96    fn get_otel_context(&self) -> Option<&ExtractedOtelContext>;
97}
98
99impl<B> HasOtelContext for http::Request<B> {
100    fn get_otel_context(&self) -> Option<&ExtractedOtelContext> {
101        self.extensions().get::<ExtractedOtelContext>()
102    }
103}
104
105impl<T> HasOtelContext for tonic::Request<T> {
106    fn get_otel_context(&self) -> Option<&ExtractedOtelContext> {
107        self.extensions().get::<ExtractedOtelContext>()
108    }
109}
110
111/// Injects the OpenTelemetry context into tonic metadata.
112///
113/// This injects both W3C TraceContext (`traceparent`, `tracestate`) and
114/// W3C Baggage (`baggage`) headers into the metadata, enabling distributed
115/// tracing and baggage propagation across gRPC service boundaries.
116///
117/// # Arguments
118///
119/// * `cx` - The OpenTelemetry context to inject
120/// * `metadata` - The tonic metadata map to inject into
121pub fn inject_context(cx: &Context, metadata: &mut MetadataMap) {
122    global::get_text_map_propagator(|propagator| {
123        propagator.inject_context(cx, &mut MetadataInjector(metadata));
124    });
125}
126
127/// Extracts the OpenTelemetry context from tonic metadata.
128///
129/// This extracts both W3C TraceContext and W3C Baggage headers from the
130/// metadata, returning a context that can be used as a parent for new spans
131/// or to read baggage values.
132///
133/// # Arguments
134///
135/// * `metadata` - The tonic metadata map to extract from
136///
137/// # Returns
138///
139/// The extracted OpenTelemetry context, or an empty context if no propagation
140/// headers were found.
141pub fn extract_context(metadata: &MetadataMap) -> Context {
142    global::get_text_map_propagator(|propagator| propagator.extract(&MetadataExtractor(metadata)))
143}
144
145/// Returns the current OpenTelemetry context, enriched with traffic type baggage
146/// if the `LINERA_TRAFFIC_TYPE` environment variable is set.
147///
148/// This function provides a workaround for async code that cannot hold a `ContextGuard`
149/// across `.await` points (since `ContextGuard` is `!Send`).
150///
151/// If `LINERA_TRAFFIC_TYPE=synthetic` is set, the returned context will have
152/// synthetic traffic baggage attached.
153pub fn get_context_with_traffic_type() -> Context {
154    use opentelemetry::{baggage::BaggageExt, Key, KeyValue};
155
156    let cx = Context::current();
157
158    if std::env::var(TRAFFIC_TYPE_ENV_VAR)
159        .map(|v| v == TRAFFIC_TYPE_SYNTHETIC)
160        .unwrap_or(false)
161    {
162        cx.with_baggage(vec![KeyValue::new(
163            Key::new(TRAFFIC_TYPE_KEY),
164            TRAFFIC_TYPE_SYNTHETIC,
165        )])
166    } else {
167        cx
168    }
169}
170
171/// Extracts the traffic type from the current context.
172///
173/// Returns "organic" if no traffic type baggage is set.
174pub fn get_traffic_type(cx: &Context) -> &'static str {
175    use opentelemetry::baggage::BaggageExt;
176
177    cx.baggage()
178        .get(TRAFFIC_TYPE_KEY)
179        .map(|v| v.as_str())
180        .and_then(|v| {
181            if v == TRAFFIC_TYPE_SYNTHETIC {
182                Some(TRAFFIC_TYPE_SYNTHETIC)
183            } else {
184                None
185            }
186        })
187        .unwrap_or(TRAFFIC_TYPE_ORGANIC)
188}
189
190/// Gets the traffic type from a request's extensions.
191///
192/// Works with both `http::Request` and `tonic::Request` via the `HasOtelContext` trait.
193/// Returns "organic" if no context was extracted or no traffic type baggage was set.
194pub fn get_traffic_type_from_request<R: HasOtelContext>(request: &R) -> &'static str {
195    request
196        .get_otel_context()
197        .map_or(TRAFFIC_TYPE_ORGANIC, |ext| get_traffic_type(&ext.0))
198}
199
200/// Gets the OpenTelemetry context from a tonic::Request's extensions.
201///
202/// Returns `None` if no context was extracted by OtelContextLayer.
203/// Tower middleware extensions are preserved in tonic::Request extensions.
204pub fn get_otel_context_from_tonic_request<T>(request: &tonic::Request<T>) -> Option<Context> {
205    request
206        .extensions()
207        .get::<ExtractedOtelContext>()
208        .map(|ext| ext.0.clone())
209}
210
211/// Creates a new tonic::Request with the OpenTelemetry context injected into metadata.
212///
213/// This is used to propagate context when forwarding requests to downstream services.
214/// If `cx` is `None`, returns a request without injected context.
215pub fn create_request_with_context<T>(inner: T, cx: Option<&Context>) -> tonic::Request<T> {
216    let mut request = tonic::Request::new(inner);
217    if let Some(cx) = cx {
218        inject_context(cx, request.metadata_mut());
219    }
220    request
221}
222
223/// Creates a new tonic::Request with the current tracing span's context injected.
224///
225/// This gets the OpenTelemetry context from the current tracing span and injects it
226/// into the request metadata. Use this when forwarding requests to downstream services
227/// to ensure proper distributed tracing (child spans linked to parent spans).
228pub fn create_request_with_current_span_context<T>(inner: T) -> tonic::Request<T> {
229    use tracing_opentelemetry::OpenTelemetrySpanExt;
230    let cx = tracing::Span::current().context();
231    create_request_with_context(inner, Some(&cx))
232}
233
234impl<S> Layer<S> for OtelContextLayer {
235    type Service = OtelContextService<S>;
236
237    fn layer(&self, service: S) -> Self::Service {
238        OtelContextService { inner: service }
239    }
240}
241
242impl<S, B> Service<http::Request<B>> for OtelContextService<S>
243where
244    S: Service<http::Request<B>> + Clone + Send + 'static,
245    S::Future: Send,
246    B: Send + 'static,
247{
248    type Response = S::Response;
249    type Error = S::Error;
250    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
251
252    fn poll_ready(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<(), Self::Error>> {
253        self.inner.poll_ready(cx)
254    }
255
256    fn call(&mut self, mut request: http::Request<B>) -> Self::Future {
257        use tracing::Instrument;
258        use tracing_opentelemetry::OpenTelemetrySpanExt;
259
260        let cx = global::get_text_map_propagator(|propagator| {
261            propagator.extract(&HttpHeaderExtractor(request.headers()))
262        });
263
264        request
265            .extensions_mut()
266            .insert(ExtractedOtelContext(cx.clone()));
267
268        let span = tracing::info_span!("grpc_request");
269        span.set_parent(cx);
270
271        let mut inner = self.inner.clone();
272        async move { inner.call(request).await }
273            .instrument(span)
274            .boxed()
275    }
276}
277
278struct MetadataInjector<'a>(&'a mut MetadataMap);
279
280impl Injector for MetadataInjector<'_> {
281    fn set(&mut self, key: &str, value: String) {
282        match MetadataKey::from_bytes(key.as_bytes()) {
283            Ok(key) => match MetadataValue::try_from(&value) {
284                Ok(value) => {
285                    self.0.insert(key, value);
286                }
287                Err(error) => {
288                    warn!(
289                        value,
290                        error = format!("{error:#}"),
291                        "failed to parse metadata value"
292                    );
293                }
294            },
295            Err(error) => {
296                warn!(
297                    key,
298                    error = format!("{error:#}"),
299                    "failed to parse metadata key"
300                );
301            }
302        }
303    }
304}
305
306struct MetadataExtractor<'a>(&'a MetadataMap);
307
308impl Extractor for MetadataExtractor<'_> {
309    fn get(&self, key: &str) -> Option<&str> {
310        self.0.get(key).and_then(|value| value.to_str().ok())
311    }
312
313    fn keys(&self) -> Vec<&str> {
314        self.0
315            .keys()
316            .filter_map(|key| match key {
317                tonic::metadata::KeyRef::Ascii(key) => Some(key.as_str()),
318                tonic::metadata::KeyRef::Binary(_) => None,
319            })
320            .collect()
321    }
322}
323
324struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
325
326impl Extractor for HttpHeaderExtractor<'_> {
327    fn get(&self, key: &str) -> Option<&str> {
328        self.0.get(key).and_then(|v| v.to_str().ok())
329    }
330
331    fn keys(&self) -> Vec<&str> {
332        self.0.keys().map(|k| k.as_str()).collect()
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use opentelemetry::{baggage::BaggageExt, Key, KeyValue};
339
340    use super::*;
341
342    #[test]
343    fn test_inject_and_extract_baggage() {
344        use opentelemetry::propagation::TextMapCompositePropagator;
345        use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
346
347        let propagator = TextMapCompositePropagator::new(vec![
348            Box::new(TraceContextPropagator::new()),
349            Box::new(BaggagePropagator::new()),
350        ]);
351        global::set_text_map_propagator(propagator);
352
353        let cx = Context::current().with_baggage(vec![KeyValue::new(
354            Key::new(TRAFFIC_TYPE_KEY),
355            TRAFFIC_TYPE_SYNTHETIC,
356        )]);
357
358        let mut metadata = MetadataMap::new();
359        inject_context(&cx, &mut metadata);
360
361        assert!(
362            metadata.get("baggage").is_some(),
363            "baggage header should be present"
364        );
365
366        let extracted_cx = extract_context(&metadata);
367        let traffic_type = get_traffic_type(&extracted_cx);
368        assert_eq!(traffic_type, TRAFFIC_TYPE_SYNTHETIC);
369    }
370
371    #[test]
372    fn test_default_traffic_type_is_organic() {
373        let cx = Context::current();
374        assert_eq!(get_traffic_type(&cx), TRAFFIC_TYPE_ORGANIC);
375    }
376}