1mod analyzer;
4#[cfg(feature = "apollo_persisted_queries")]
5pub mod apollo_persisted_queries;
6#[cfg(feature = "apollo_tracing")]
7mod apollo_tracing;
8#[cfg(feature = "log")]
9mod logger;
10#[cfg(feature = "opentelemetry")]
11mod opentelemetry;
12#[cfg(feature = "tracing")]
13mod tracing;
14
15use std::{
16 any::{Any, TypeId},
17 future::Future,
18 sync::Arc,
19};
20
21use futures_util::{FutureExt, future::BoxFuture, stream::BoxStream};
22
23pub use self::analyzer::Analyzer;
24#[cfg(feature = "apollo_tracing")]
25pub use self::apollo_tracing::ApolloTracing;
26#[cfg(feature = "log")]
27pub use self::logger::Logger;
28#[cfg(feature = "opentelemetry")]
29pub use self::opentelemetry::OpenTelemetry;
30#[cfg(feature = "tracing")]
31pub use self::tracing::Tracing;
32use crate::{
33 Data, DataContext, Error, QueryPathNode, Request, Response, Result, SDLExportOptions,
34 SchemaEnv, ServerError, ServerResult, ValidationResult, Value, Variables,
35 parser::types::{ExecutableDocument, Field},
36};
37
38pub struct ExtensionContext<'a> {
40 pub schema_env: &'a SchemaEnv,
42
43 pub session_data: &'a Data,
51
52 pub query_data: Option<&'a Data>,
59}
60
61impl<'a> DataContext<'a> for ExtensionContext<'a> {
62 fn data<D: Any + Send + Sync>(&self) -> Result<&'a D> {
63 ExtensionContext::data::<D>(self)
64 }
65
66 fn data_unchecked<D: Any + Send + Sync>(&self) -> &'a D {
67 ExtensionContext::data_unchecked::<D>(self)
68 }
69
70 fn data_opt<D: Any + Send + Sync>(&self) -> Option<&'a D> {
71 ExtensionContext::data_opt::<D>(self)
72 }
73}
74
75impl<'a> ExtensionContext<'a> {
76 pub fn stringify_execute_doc(&self, doc: &ExecutableDocument, variables: &Variables) -> String {
80 self.schema_env
81 .registry
82 .stringify_exec_doc(variables, doc)
83 .unwrap_or_default()
84 }
85
86 pub fn sdl(&self) -> String {
88 self.schema_env.registry.export_sdl(Default::default())
89 }
90
91 pub fn sdl_with_options(&self, options: SDLExportOptions) -> String {
93 self.schema_env.registry.export_sdl(options)
94 }
95
96 pub fn data<D: Any + Send + Sync>(&self) -> Result<&'a D> {
105 self.data_opt::<D>().ok_or_else(|| {
106 Error::new(format!(
107 "Data `{}` does not exist.",
108 std::any::type_name::<D>()
109 ))
110 })
111 }
112
113 pub fn data_unchecked<D: Any + Send + Sync>(&self) -> &'a D {
119 self.data_opt::<D>()
120 .unwrap_or_else(|| panic!("Data `{}` does not exist.", std::any::type_name::<D>()))
121 }
122
123 pub fn data_opt<D: Any + Send + Sync>(&self) -> Option<&'a D> {
126 self.query_data
127 .and_then(|query_data| query_data.get(&TypeId::of::<D>()))
128 .or_else(|| self.session_data.get(&TypeId::of::<D>()))
129 .or_else(|| self.schema_env.data.get(&TypeId::of::<D>()))
130 .and_then(|d| d.downcast_ref::<D>())
131 }
132}
133
134pub struct ResolveInfo<'a> {
136 pub path_node: &'a QueryPathNode<'a>,
138
139 pub parent_type: &'a str,
141
142 pub return_type: &'a str,
144
145 pub name: &'a str,
147
148 pub alias: Option<&'a str>,
150
151 pub is_for_introspection: bool,
153
154 pub field: &'a Field,
156}
157
158type RequestFut<'a> = &'a mut (dyn Future<Output = Response> + Send + Unpin);
159
160type ParseFut<'a> = &'a mut (dyn Future<Output = ServerResult<ExecutableDocument>> + Send + Unpin);
161
162type ValidationFut<'a> =
163 &'a mut (dyn Future<Output = Result<ValidationResult, Vec<ServerError>>> + Send + Unpin);
164
165type ExecuteFutFactory<'a> = Box<dyn FnOnce(Option<Data>) -> BoxFuture<'a, Response> + Send + 'a>;
166
167pub type ResolveFut<'a> = &'a mut (dyn Future<Output = ServerResult<Option<Value>>> + Send + Unpin);
169
170pub struct NextRequest<'a> {
172 chain: &'a [Arc<dyn Extension>],
173 request_fut: RequestFut<'a>,
174}
175
176impl NextRequest<'_> {
177 pub async fn run(self, ctx: &ExtensionContext<'_>) -> Response {
179 if let Some((first, next)) = self.chain.split_first() {
180 first
181 .request(
182 ctx,
183 NextRequest {
184 chain: next,
185 request_fut: self.request_fut,
186 },
187 )
188 .await
189 } else {
190 self.request_fut.await
191 }
192 }
193}
194
195pub struct NextSubscribe<'a> {
197 chain: &'a [Arc<dyn Extension>],
198}
199
200impl NextSubscribe<'_> {
201 pub fn run<'s>(
203 self,
204 ctx: &ExtensionContext<'_>,
205 stream: BoxStream<'s, Response>,
206 ) -> BoxStream<'s, Response> {
207 if let Some((first, next)) = self.chain.split_first() {
208 first.subscribe(ctx, stream, NextSubscribe { chain: next })
209 } else {
210 stream
211 }
212 }
213}
214
215pub struct NextPrepareRequest<'a> {
217 chain: &'a [Arc<dyn Extension>],
218}
219
220impl NextPrepareRequest<'_> {
221 pub async fn run(self, ctx: &ExtensionContext<'_>, request: Request) -> ServerResult<Request> {
223 if let Some((first, next)) = self.chain.split_first() {
224 first
225 .prepare_request(ctx, request, NextPrepareRequest { chain: next })
226 .await
227 } else {
228 Ok(request)
229 }
230 }
231}
232
233pub struct NextParseQuery<'a> {
235 chain: &'a [Arc<dyn Extension>],
236 parse_query_fut: ParseFut<'a>,
237}
238
239impl NextParseQuery<'_> {
240 pub async fn run(
242 self,
243 ctx: &ExtensionContext<'_>,
244 query: &str,
245 variables: &Variables,
246 ) -> ServerResult<ExecutableDocument> {
247 if let Some((first, next)) = self.chain.split_first() {
248 first
249 .parse_query(
250 ctx,
251 query,
252 variables,
253 NextParseQuery {
254 chain: next,
255 parse_query_fut: self.parse_query_fut,
256 },
257 )
258 .await
259 } else {
260 self.parse_query_fut.await
261 }
262 }
263}
264
265pub struct NextValidation<'a> {
267 chain: &'a [Arc<dyn Extension>],
268 validation_fut: ValidationFut<'a>,
269}
270
271impl NextValidation<'_> {
272 pub async fn run(
274 self,
275 ctx: &ExtensionContext<'_>,
276 ) -> Result<ValidationResult, Vec<ServerError>> {
277 if let Some((first, next)) = self.chain.split_first() {
278 first
279 .validation(
280 ctx,
281 NextValidation {
282 chain: next,
283 validation_fut: self.validation_fut,
284 },
285 )
286 .await
287 } else {
288 self.validation_fut.await
289 }
290 }
291}
292
293pub struct NextExecute<'a> {
295 chain: &'a [Arc<dyn Extension>],
296 execute_fut_factory: ExecuteFutFactory<'a>,
297 execute_data: Option<Data>,
298}
299
300impl NextExecute<'_> {
301 async fn internal_run(
302 self,
303 ctx: &ExtensionContext<'_>,
304 operation_name: Option<&str>,
305 data: Option<Data>,
306 ) -> Response {
307 let execute_data = match (self.execute_data, data) {
308 (Some(mut data1), Some(data2)) => {
309 data1.merge(data2);
310 Some(data1)
311 }
312 (Some(data), None) => Some(data),
313 (None, Some(data)) => Some(data),
314 (None, None) => None,
315 };
316
317 if let Some((first, next)) = self.chain.split_first() {
318 first
319 .execute(
320 ctx,
321 operation_name,
322 NextExecute {
323 chain: next,
324 execute_fut_factory: self.execute_fut_factory,
325 execute_data,
326 },
327 )
328 .await
329 } else {
330 (self.execute_fut_factory)(execute_data).await
331 }
332 }
333
334 pub async fn run(self, ctx: &ExtensionContext<'_>, operation_name: Option<&str>) -> Response {
336 self.internal_run(ctx, operation_name, None).await
337 }
338
339 pub async fn run_with_data(
342 self,
343 ctx: &ExtensionContext<'_>,
344 operation_name: Option<&str>,
345 data: Data,
346 ) -> Response {
347 self.internal_run(ctx, operation_name, Some(data)).await
348 }
349}
350
351pub struct NextResolve<'a> {
353 chain: &'a [Arc<dyn Extension>],
354 resolve_fut: ResolveFut<'a>,
355}
356
357impl NextResolve<'_> {
358 pub async fn run(
360 self,
361 ctx: &ExtensionContext<'_>,
362 info: ResolveInfo<'_>,
363 ) -> ServerResult<Option<Value>> {
364 if let Some((first, next)) = self.chain.split_first() {
365 first
366 .resolve(
367 ctx,
368 info,
369 NextResolve {
370 chain: next,
371 resolve_fut: self.resolve_fut,
372 },
373 )
374 .await
375 } else {
376 self.resolve_fut.await
377 }
378 }
379}
380
381#[async_trait::async_trait]
383pub trait Extension: Sync + Send + 'static {
384 async fn request(&self, ctx: &ExtensionContext<'_>, next: NextRequest<'_>) -> Response {
386 next.run(ctx).await
387 }
388
389 fn subscribe<'s>(
391 &self,
392 ctx: &ExtensionContext<'_>,
393 stream: BoxStream<'s, Response>,
394 next: NextSubscribe<'_>,
395 ) -> BoxStream<'s, Response> {
396 next.run(ctx, stream)
397 }
398
399 async fn prepare_request(
401 &self,
402 ctx: &ExtensionContext<'_>,
403 request: Request,
404 next: NextPrepareRequest<'_>,
405 ) -> ServerResult<Request> {
406 next.run(ctx, request).await
407 }
408
409 async fn parse_query(
411 &self,
412 ctx: &ExtensionContext<'_>,
413 query: &str,
414 variables: &Variables,
415 next: NextParseQuery<'_>,
416 ) -> ServerResult<ExecutableDocument> {
417 next.run(ctx, query, variables).await
418 }
419
420 async fn validation(
422 &self,
423 ctx: &ExtensionContext<'_>,
424 next: NextValidation<'_>,
425 ) -> Result<ValidationResult, Vec<ServerError>> {
426 next.run(ctx).await
427 }
428
429 async fn execute(
431 &self,
432 ctx: &ExtensionContext<'_>,
433 operation_name: Option<&str>,
434 next: NextExecute<'_>,
435 ) -> Response {
436 next.run(ctx, operation_name).await
437 }
438
439 async fn resolve(
441 &self,
442 ctx: &ExtensionContext<'_>,
443 info: ResolveInfo<'_>,
444 next: NextResolve<'_>,
445 ) -> ServerResult<Option<Value>> {
446 next.run(ctx, info).await
447 }
448}
449
450pub trait ExtensionFactory: Send + Sync + 'static {
454 fn create(&self) -> Arc<dyn Extension>;
456}
457
458#[derive(Clone)]
459#[doc(hidden)]
460pub struct Extensions {
461 extensions: Vec<Arc<dyn Extension>>,
462 schema_env: SchemaEnv,
463 session_data: Arc<Data>,
464 query_data: Option<Arc<Data>>,
465}
466
467#[doc(hidden)]
468impl Extensions {
469 pub(crate) fn new(
470 extensions: impl IntoIterator<Item = Arc<dyn Extension>>,
471 schema_env: SchemaEnv,
472 session_data: Arc<Data>,
473 ) -> Self {
474 Extensions {
475 extensions: extensions.into_iter().collect(),
476 schema_env,
477 session_data,
478 query_data: None,
479 }
480 }
481
482 #[inline]
483 pub(crate) fn attach_query_data(&mut self, data: Arc<Data>) {
484 self.query_data = Some(data);
485 }
486
487 #[inline]
488 pub(crate) fn is_empty(&self) -> bool {
489 self.extensions.is_empty()
490 }
491
492 #[inline]
493 fn create_context(&self) -> ExtensionContext {
494 ExtensionContext {
495 schema_env: &self.schema_env,
496 session_data: &self.session_data,
497 query_data: self.query_data.as_deref(),
498 }
499 }
500
501 pub async fn request(&self, request_fut: RequestFut<'_>) -> Response {
502 let next = NextRequest {
503 chain: &self.extensions,
504 request_fut,
505 };
506 next.run(&self.create_context()).await
507 }
508
509 pub fn subscribe<'s>(&self, stream: BoxStream<'s, Response>) -> BoxStream<'s, Response> {
510 let next = NextSubscribe {
511 chain: &self.extensions,
512 };
513 next.run(&self.create_context(), stream)
514 }
515
516 pub async fn prepare_request(&self, request: Request) -> ServerResult<Request> {
517 let next = NextPrepareRequest {
518 chain: &self.extensions,
519 };
520 next.run(&self.create_context(), request).await
521 }
522
523 pub async fn parse_query(
524 &self,
525 query: &str,
526 variables: &Variables,
527 parse_query_fut: ParseFut<'_>,
528 ) -> ServerResult<ExecutableDocument> {
529 let next = NextParseQuery {
530 chain: &self.extensions,
531 parse_query_fut,
532 };
533 next.run(&self.create_context(), query, variables).await
534 }
535
536 pub async fn validation(
537 &self,
538 validation_fut: ValidationFut<'_>,
539 ) -> Result<ValidationResult, Vec<ServerError>> {
540 let next = NextValidation {
541 chain: &self.extensions,
542 validation_fut,
543 };
544 next.run(&self.create_context()).await
545 }
546
547 pub async fn execute<'a, 'b, F, T>(
548 &'a self,
549 operation_name: Option<&str>,
550 execute_fut_factory: F,
551 ) -> Response
552 where
553 F: FnOnce(Option<Data>) -> T + Send + 'a,
554 T: Future<Output = Response> + Send + 'a,
555 {
556 let next = NextExecute {
557 chain: &self.extensions,
558 execute_fut_factory: Box::new(|data| execute_fut_factory(data).boxed()),
559 execute_data: None,
560 };
561 next.run(&self.create_context(), operation_name).await
562 }
563
564 pub async fn resolve(
565 &self,
566 info: ResolveInfo<'_>,
567 resolve_fut: ResolveFut<'_>,
568 ) -> ServerResult<Option<Value>> {
569 let next = NextResolve {
570 chain: &self.extensions,
571 resolve_fut,
572 };
573 next.run(&self.create_context(), info).await
574 }
575}