linera_rpc/
propagation.rs1use std::task::{Context as TaskContext, Poll};
31
32use futures::{future::BoxFuture, FutureExt};
33use opentelemetry::{
34 global,
35 propagation::{Extractor, Injector},
36 Context,
37};
38use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
39use tower::{Layer, Service};
40use tracing::warn;
41
42pub const TRAFFIC_TYPE_KEY: &str = "traffic_type";
47
48pub const TRAFFIC_TYPE_ORGANIC: &str = "organic";
50
51pub const TRAFFIC_TYPE_SYNTHETIC: &str = "synthetic";
53
54pub const TRAFFIC_TYPE_UNKNOWN: &str = "unknown";
56
57pub const TRAFFIC_TYPE_ENV_VAR: &str = "LINERA_TRAFFIC_TYPE";
62
63#[derive(Clone, Copy, Debug, Default)]
79pub struct OtelContextLayer;
80
81#[derive(Clone, Debug)]
83pub struct OtelContextService<S> {
84 inner: S,
85}
86
87#[derive(Clone, Debug)]
89pub struct ExtractedOtelContext(pub Context);
90
91pub trait HasOtelContext {
96 fn get_otel_context(&self) -> Option<&ExtractedOtelContext>;
97}
98
99impl<B> HasOtelContext for http::Request<B> {
100 fn get_otel_context(&self) -> Option<&ExtractedOtelContext> {
101 self.extensions().get::<ExtractedOtelContext>()
102 }
103}
104
105impl<T> HasOtelContext for tonic::Request<T> {
106 fn get_otel_context(&self) -> Option<&ExtractedOtelContext> {
107 self.extensions().get::<ExtractedOtelContext>()
108 }
109}
110
111pub fn inject_context(cx: &Context, metadata: &mut MetadataMap) {
122 global::get_text_map_propagator(|propagator| {
123 propagator.inject_context(cx, &mut MetadataInjector(metadata));
124 });
125}
126
127pub fn extract_context(metadata: &MetadataMap) -> Context {
142 global::get_text_map_propagator(|propagator| propagator.extract(&MetadataExtractor(metadata)))
143}
144
145pub fn get_context_with_traffic_type() -> Context {
154 use opentelemetry::{baggage::BaggageExt, Key, KeyValue};
155
156 let cx = Context::current();
157
158 if std::env::var(TRAFFIC_TYPE_ENV_VAR)
159 .map(|v| v == TRAFFIC_TYPE_SYNTHETIC)
160 .unwrap_or(false)
161 {
162 cx.with_baggage(vec![KeyValue::new(
163 Key::new(TRAFFIC_TYPE_KEY),
164 TRAFFIC_TYPE_SYNTHETIC,
165 )])
166 } else {
167 cx
168 }
169}
170
171pub fn get_traffic_type(cx: &Context) -> &'static str {
175 use opentelemetry::baggage::BaggageExt;
176
177 cx.baggage()
178 .get(TRAFFIC_TYPE_KEY)
179 .map(|v| v.as_str())
180 .and_then(|v| {
181 if v == TRAFFIC_TYPE_SYNTHETIC {
182 Some(TRAFFIC_TYPE_SYNTHETIC)
183 } else {
184 None
185 }
186 })
187 .unwrap_or(TRAFFIC_TYPE_ORGANIC)
188}
189
190pub fn get_traffic_type_from_request<R: HasOtelContext>(request: &R) -> &'static str {
195 request
196 .get_otel_context()
197 .map_or(TRAFFIC_TYPE_ORGANIC, |ext| get_traffic_type(&ext.0))
198}
199
200pub fn get_otel_context_from_tonic_request<T>(request: &tonic::Request<T>) -> Option<Context> {
205 request
206 .extensions()
207 .get::<ExtractedOtelContext>()
208 .map(|ext| ext.0.clone())
209}
210
211pub fn create_request_with_context<T>(inner: T, cx: Option<&Context>) -> tonic::Request<T> {
216 let mut request = tonic::Request::new(inner);
217 if let Some(cx) = cx {
218 inject_context(cx, request.metadata_mut());
219 }
220 request
221}
222
223pub fn create_request_with_current_span_context<T>(inner: T) -> tonic::Request<T> {
229 use tracing_opentelemetry::OpenTelemetrySpanExt;
230 let cx = tracing::Span::current().context();
231 create_request_with_context(inner, Some(&cx))
232}
233
234impl<S> Layer<S> for OtelContextLayer {
235 type Service = OtelContextService<S>;
236
237 fn layer(&self, service: S) -> Self::Service {
238 OtelContextService { inner: service }
239 }
240}
241
242impl<S, B> Service<http::Request<B>> for OtelContextService<S>
243where
244 S: Service<http::Request<B>> + Clone + Send + 'static,
245 S::Future: Send,
246 B: Send + 'static,
247{
248 type Response = S::Response;
249 type Error = S::Error;
250 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
251
252 fn poll_ready(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<(), Self::Error>> {
253 self.inner.poll_ready(cx)
254 }
255
256 fn call(&mut self, mut request: http::Request<B>) -> Self::Future {
257 use tracing::Instrument;
258 use tracing_opentelemetry::OpenTelemetrySpanExt;
259
260 let cx = global::get_text_map_propagator(|propagator| {
261 propagator.extract(&HttpHeaderExtractor(request.headers()))
262 });
263
264 request
265 .extensions_mut()
266 .insert(ExtractedOtelContext(cx.clone()));
267
268 let span = tracing::info_span!("grpc_request");
269 span.set_parent(cx);
270
271 let mut inner = self.inner.clone();
272 async move { inner.call(request).await }
273 .instrument(span)
274 .boxed()
275 }
276}
277
278struct MetadataInjector<'a>(&'a mut MetadataMap);
279
280impl Injector for MetadataInjector<'_> {
281 fn set(&mut self, key: &str, value: String) {
282 match MetadataKey::from_bytes(key.as_bytes()) {
283 Ok(key) => match MetadataValue::try_from(&value) {
284 Ok(value) => {
285 self.0.insert(key, value);
286 }
287 Err(error) => {
288 warn!(
289 value,
290 error = format!("{error:#}"),
291 "failed to parse metadata value"
292 );
293 }
294 },
295 Err(error) => {
296 warn!(
297 key,
298 error = format!("{error:#}"),
299 "failed to parse metadata key"
300 );
301 }
302 }
303 }
304}
305
306struct MetadataExtractor<'a>(&'a MetadataMap);
307
308impl Extractor for MetadataExtractor<'_> {
309 fn get(&self, key: &str) -> Option<&str> {
310 self.0.get(key).and_then(|value| value.to_str().ok())
311 }
312
313 fn keys(&self) -> Vec<&str> {
314 self.0
315 .keys()
316 .filter_map(|key| match key {
317 tonic::metadata::KeyRef::Ascii(key) => Some(key.as_str()),
318 tonic::metadata::KeyRef::Binary(_) => None,
319 })
320 .collect()
321 }
322}
323
324struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
325
326impl Extractor for HttpHeaderExtractor<'_> {
327 fn get(&self, key: &str) -> Option<&str> {
328 self.0.get(key).and_then(|v| v.to_str().ok())
329 }
330
331 fn keys(&self) -> Vec<&str> {
332 self.0.keys().map(|k| k.as_str()).collect()
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use opentelemetry::{baggage::BaggageExt, Key, KeyValue};
339
340 use super::*;
341
342 #[test]
343 fn test_inject_and_extract_baggage() {
344 use opentelemetry::propagation::TextMapCompositePropagator;
345 use opentelemetry_sdk::propagation::{BaggagePropagator, TraceContextPropagator};
346
347 let propagator = TextMapCompositePropagator::new(vec![
348 Box::new(TraceContextPropagator::new()),
349 Box::new(BaggagePropagator::new()),
350 ]);
351 global::set_text_map_propagator(propagator);
352
353 let cx = Context::current().with_baggage(vec![KeyValue::new(
354 Key::new(TRAFFIC_TYPE_KEY),
355 TRAFFIC_TYPE_SYNTHETIC,
356 )]);
357
358 let mut metadata = MetadataMap::new();
359 inject_context(&cx, &mut metadata);
360
361 assert!(
362 metadata.get("baggage").is_some(),
363 "baggage header should be present"
364 );
365
366 let extracted_cx = extract_context(&metadata);
367 let traffic_type = get_traffic_type(&extracted_cx);
368 assert_eq!(traffic_type, TRAFFIC_TYPE_SYNTHETIC);
369 }
370
371 #[test]
372 fn test_default_traffic_type_is_organic() {
373 let cx = Context::current();
374 assert_eq!(get_traffic_type(&cx), TRAFFIC_TYPE_ORGANIC);
375 }
376}