async_graphql/extensions/
mod.rs

1//! Extensions for schema
2
3mod analyzer;
4#[cfg(feature = "apollo_persisted_queries")]
5pub mod apollo_persisted_queries;
6#[cfg(feature = "apollo_tracing")]
7mod apollo_tracing;
8#[cfg(feature = "log")]
9mod logger;
10#[cfg(feature = "opentelemetry")]
11mod opentelemetry;
12#[cfg(feature = "tracing")]
13mod tracing;
14
15use std::{
16    any::{Any, TypeId},
17    future::Future,
18    sync::Arc,
19};
20
21use futures_util::{FutureExt, future::BoxFuture, stream::BoxStream};
22
23pub use self::analyzer::Analyzer;
24#[cfg(feature = "apollo_tracing")]
25pub use self::apollo_tracing::ApolloTracing;
26#[cfg(feature = "log")]
27pub use self::logger::Logger;
28#[cfg(feature = "opentelemetry")]
29pub use self::opentelemetry::OpenTelemetry;
30#[cfg(feature = "tracing")]
31pub use self::tracing::Tracing;
32use crate::{
33    Data, DataContext, Error, QueryPathNode, Request, Response, Result, SDLExportOptions,
34    SchemaEnv, ServerError, ServerResult, ValidationResult, Value, Variables,
35    parser::types::{ExecutableDocument, Field},
36};
37
38/// Context for extension
39pub struct ExtensionContext<'a> {
40    /// Schema-scope context data, [`Registry`], and custom directives.
41    pub schema_env: &'a SchemaEnv,
42
43    /// Extension-scoped context data shared across all extensions.
44    ///
45    /// Can be accessed only from hooks that implement the [`Extension`] trait.
46    ///
47    /// It is created with each new [`Request`] and is empty by default.
48    ///
49    /// For subscriptions, the session ends when the subscription is closed.
50    pub session_data: &'a Data,
51
52    /// Request-scoped context data shared across all resolvers.
53    ///
54    /// This is a reference to [`Request::data`](Request) field.
55    /// If the request has not initialized yet, the value is seen as `None`
56    /// inside the [`Extension::request`], [`Extension::subscribe`], and
57    /// [`Extension::prepare_request`] hooks.
58    pub query_data: Option<&'a Data>,
59}
60
61impl<'a> DataContext<'a> for ExtensionContext<'a> {
62    fn data<D: Any + Send + Sync>(&self) -> Result<&'a D> {
63        ExtensionContext::data::<D>(self)
64    }
65
66    fn data_unchecked<D: Any + Send + Sync>(&self) -> &'a D {
67        ExtensionContext::data_unchecked::<D>(self)
68    }
69
70    fn data_opt<D: Any + Send + Sync>(&self) -> Option<&'a D> {
71        ExtensionContext::data_opt::<D>(self)
72    }
73}
74
75impl<'a> ExtensionContext<'a> {
76    /// Convert the specified [ExecutableDocument] into a query string.
77    ///
78    /// Usually used for log extension, it can hide secret arguments.
79    pub fn stringify_execute_doc(&self, doc: &ExecutableDocument, variables: &Variables) -> String {
80        self.schema_env
81            .registry
82            .stringify_exec_doc(variables, doc)
83            .unwrap_or_default()
84    }
85
86    /// Returns SDL(Schema Definition Language) of this schema.
87    pub fn sdl(&self) -> String {
88        self.schema_env.registry.export_sdl(Default::default())
89    }
90
91    /// Returns SDL(Schema Definition Language) of this schema with options.
92    pub fn sdl_with_options(&self, options: SDLExportOptions) -> String {
93        self.schema_env.registry.export_sdl(options)
94    }
95
96    /// Gets the global data defined in the `Context` or `Schema`.
97    ///
98    /// If both `Schema` and `Query` have the same data type, the data in the
99    /// `Query` is obtained.
100    ///
101    /// # Errors
102    ///
103    /// Returns a `Error` if the specified type data does not exist.
104    pub fn data<D: Any + Send + Sync>(&self) -> Result<&'a D> {
105        self.data_opt::<D>().ok_or_else(|| {
106            Error::new(format!(
107                "Data `{}` does not exist.",
108                std::any::type_name::<D>()
109            ))
110        })
111    }
112
113    /// Gets the global data defined in the `Context` or `Schema`.
114    ///
115    /// # Panics
116    ///
117    /// It will panic if the specified data type does not exist.
118    pub fn data_unchecked<D: Any + Send + Sync>(&self) -> &'a D {
119        self.data_opt::<D>()
120            .unwrap_or_else(|| panic!("Data `{}` does not exist.", std::any::type_name::<D>()))
121    }
122
123    /// Gets the global data defined in the `Context` or `Schema` or `None` if
124    /// the specified type data does not exist.
125    pub fn data_opt<D: Any + Send + Sync>(&self) -> Option<&'a D> {
126        self.query_data
127            .and_then(|query_data| query_data.get(&TypeId::of::<D>()))
128            .or_else(|| self.session_data.get(&TypeId::of::<D>()))
129            .or_else(|| self.schema_env.data.get(&TypeId::of::<D>()))
130            .and_then(|d| d.downcast_ref::<D>())
131    }
132}
133
134/// Parameters for `Extension::resolve_field_start`
135pub struct ResolveInfo<'a> {
136    /// Current path node, You can go through the entire path.
137    pub path_node: &'a QueryPathNode<'a>,
138
139    /// Parent type
140    pub parent_type: &'a str,
141
142    /// Current return type, is qualified name.
143    pub return_type: &'a str,
144
145    /// Current field name
146    pub name: &'a str,
147
148    /// Current field alias
149    pub alias: Option<&'a str>,
150
151    /// If `true` means the current field is for introspection.
152    pub is_for_introspection: bool,
153
154    /// Current field
155    pub field: &'a Field,
156}
157
158type RequestFut<'a> = &'a mut (dyn Future<Output = Response> + Send + Unpin);
159
160type ParseFut<'a> = &'a mut (dyn Future<Output = ServerResult<ExecutableDocument>> + Send + Unpin);
161
162type ValidationFut<'a> =
163    &'a mut (dyn Future<Output = Result<ValidationResult, Vec<ServerError>>> + Send + Unpin);
164
165type ExecuteFutFactory<'a> = Box<dyn FnOnce(Option<Data>) -> BoxFuture<'a, Response> + Send + 'a>;
166
167/// A future type used to resolve the field
168pub type ResolveFut<'a> = &'a mut (dyn Future<Output = ServerResult<Option<Value>>> + Send + Unpin);
169
170/// The remainder of a extension chain for request.
171pub struct NextRequest<'a> {
172    chain: &'a [Arc<dyn Extension>],
173    request_fut: RequestFut<'a>,
174}
175
176impl NextRequest<'_> {
177    /// Call the [Extension::request] function of next extension.
178    pub async fn run(self, ctx: &ExtensionContext<'_>) -> Response {
179        if let Some((first, next)) = self.chain.split_first() {
180            first
181                .request(
182                    ctx,
183                    NextRequest {
184                        chain: next,
185                        request_fut: self.request_fut,
186                    },
187                )
188                .await
189        } else {
190            self.request_fut.await
191        }
192    }
193}
194
195/// The remainder of a extension chain for subscribe.
196pub struct NextSubscribe<'a> {
197    chain: &'a [Arc<dyn Extension>],
198}
199
200impl NextSubscribe<'_> {
201    /// Call the [Extension::subscribe] function of next extension.
202    pub fn run<'s>(
203        self,
204        ctx: &ExtensionContext<'_>,
205        stream: BoxStream<'s, Response>,
206    ) -> BoxStream<'s, Response> {
207        if let Some((first, next)) = self.chain.split_first() {
208            first.subscribe(ctx, stream, NextSubscribe { chain: next })
209        } else {
210            stream
211        }
212    }
213}
214
215/// The remainder of a extension chain for subscribe.
216pub struct NextPrepareRequest<'a> {
217    chain: &'a [Arc<dyn Extension>],
218}
219
220impl NextPrepareRequest<'_> {
221    /// Call the [Extension::prepare_request] function of next extension.
222    pub async fn run(self, ctx: &ExtensionContext<'_>, request: Request) -> ServerResult<Request> {
223        if let Some((first, next)) = self.chain.split_first() {
224            first
225                .prepare_request(ctx, request, NextPrepareRequest { chain: next })
226                .await
227        } else {
228            Ok(request)
229        }
230    }
231}
232
233/// The remainder of a extension chain for parse query.
234pub struct NextParseQuery<'a> {
235    chain: &'a [Arc<dyn Extension>],
236    parse_query_fut: ParseFut<'a>,
237}
238
239impl NextParseQuery<'_> {
240    /// Call the [Extension::parse_query] function of next extension.
241    pub async fn run(
242        self,
243        ctx: &ExtensionContext<'_>,
244        query: &str,
245        variables: &Variables,
246    ) -> ServerResult<ExecutableDocument> {
247        if let Some((first, next)) = self.chain.split_first() {
248            first
249                .parse_query(
250                    ctx,
251                    query,
252                    variables,
253                    NextParseQuery {
254                        chain: next,
255                        parse_query_fut: self.parse_query_fut,
256                    },
257                )
258                .await
259        } else {
260            self.parse_query_fut.await
261        }
262    }
263}
264
265/// The remainder of a extension chain for validation.
266pub struct NextValidation<'a> {
267    chain: &'a [Arc<dyn Extension>],
268    validation_fut: ValidationFut<'a>,
269}
270
271impl NextValidation<'_> {
272    /// Call the [Extension::validation] function of next extension.
273    pub async fn run(
274        self,
275        ctx: &ExtensionContext<'_>,
276    ) -> Result<ValidationResult, Vec<ServerError>> {
277        if let Some((first, next)) = self.chain.split_first() {
278            first
279                .validation(
280                    ctx,
281                    NextValidation {
282                        chain: next,
283                        validation_fut: self.validation_fut,
284                    },
285                )
286                .await
287        } else {
288            self.validation_fut.await
289        }
290    }
291}
292
293/// The remainder of a extension chain for execute.
294pub struct NextExecute<'a> {
295    chain: &'a [Arc<dyn Extension>],
296    execute_fut_factory: ExecuteFutFactory<'a>,
297    execute_data: Option<Data>,
298}
299
300impl NextExecute<'_> {
301    async fn internal_run(
302        self,
303        ctx: &ExtensionContext<'_>,
304        operation_name: Option<&str>,
305        data: Option<Data>,
306    ) -> Response {
307        let execute_data = match (self.execute_data, data) {
308            (Some(mut data1), Some(data2)) => {
309                data1.merge(data2);
310                Some(data1)
311            }
312            (Some(data), None) => Some(data),
313            (None, Some(data)) => Some(data),
314            (None, None) => None,
315        };
316
317        if let Some((first, next)) = self.chain.split_first() {
318            first
319                .execute(
320                    ctx,
321                    operation_name,
322                    NextExecute {
323                        chain: next,
324                        execute_fut_factory: self.execute_fut_factory,
325                        execute_data,
326                    },
327                )
328                .await
329        } else {
330            (self.execute_fut_factory)(execute_data).await
331        }
332    }
333
334    /// Call the [Extension::execute] function of next extension.
335    pub async fn run(self, ctx: &ExtensionContext<'_>, operation_name: Option<&str>) -> Response {
336        self.internal_run(ctx, operation_name, None).await
337    }
338
339    /// Call the [Extension::execute] function of next extension with context
340    /// data.
341    pub async fn run_with_data(
342        self,
343        ctx: &ExtensionContext<'_>,
344        operation_name: Option<&str>,
345        data: Data,
346    ) -> Response {
347        self.internal_run(ctx, operation_name, Some(data)).await
348    }
349}
350
351/// The remainder of a extension chain for resolve.
352pub struct NextResolve<'a> {
353    chain: &'a [Arc<dyn Extension>],
354    resolve_fut: ResolveFut<'a>,
355}
356
357impl NextResolve<'_> {
358    /// Call the [Extension::resolve] function of next extension.
359    pub async fn run(
360        self,
361        ctx: &ExtensionContext<'_>,
362        info: ResolveInfo<'_>,
363    ) -> ServerResult<Option<Value>> {
364        if let Some((first, next)) = self.chain.split_first() {
365            first
366                .resolve(
367                    ctx,
368                    info,
369                    NextResolve {
370                        chain: next,
371                        resolve_fut: self.resolve_fut,
372                    },
373                )
374                .await
375        } else {
376            self.resolve_fut.await
377        }
378    }
379}
380
381/// Represents a GraphQL extension
382#[async_trait::async_trait]
383pub trait Extension: Sync + Send + 'static {
384    /// Called at start query/mutation request.
385    async fn request(&self, ctx: &ExtensionContext<'_>, next: NextRequest<'_>) -> Response {
386        next.run(ctx).await
387    }
388
389    /// Called at subscribe request.
390    fn subscribe<'s>(
391        &self,
392        ctx: &ExtensionContext<'_>,
393        stream: BoxStream<'s, Response>,
394        next: NextSubscribe<'_>,
395    ) -> BoxStream<'s, Response> {
396        next.run(ctx, stream)
397    }
398
399    /// Called at prepare request.
400    async fn prepare_request(
401        &self,
402        ctx: &ExtensionContext<'_>,
403        request: Request,
404        next: NextPrepareRequest<'_>,
405    ) -> ServerResult<Request> {
406        next.run(ctx, request).await
407    }
408
409    /// Called at parse query.
410    async fn parse_query(
411        &self,
412        ctx: &ExtensionContext<'_>,
413        query: &str,
414        variables: &Variables,
415        next: NextParseQuery<'_>,
416    ) -> ServerResult<ExecutableDocument> {
417        next.run(ctx, query, variables).await
418    }
419
420    /// Called at validation query.
421    async fn validation(
422        &self,
423        ctx: &ExtensionContext<'_>,
424        next: NextValidation<'_>,
425    ) -> Result<ValidationResult, Vec<ServerError>> {
426        next.run(ctx).await
427    }
428
429    /// Called at execute query.
430    async fn execute(
431        &self,
432        ctx: &ExtensionContext<'_>,
433        operation_name: Option<&str>,
434        next: NextExecute<'_>,
435    ) -> Response {
436        next.run(ctx, operation_name).await
437    }
438
439    /// Called at resolve field.
440    async fn resolve(
441        &self,
442        ctx: &ExtensionContext<'_>,
443        info: ResolveInfo<'_>,
444        next: NextResolve<'_>,
445    ) -> ServerResult<Option<Value>> {
446        next.run(ctx, info).await
447    }
448}
449
450/// Extension factory
451///
452/// Used to create an extension instance.
453pub trait ExtensionFactory: Send + Sync + 'static {
454    /// Create an extended instance.
455    fn create(&self) -> Arc<dyn Extension>;
456}
457
458#[derive(Clone)]
459#[doc(hidden)]
460pub struct Extensions {
461    extensions: Vec<Arc<dyn Extension>>,
462    schema_env: SchemaEnv,
463    session_data: Arc<Data>,
464    query_data: Option<Arc<Data>>,
465}
466
467#[doc(hidden)]
468impl Extensions {
469    pub(crate) fn new(
470        extensions: impl IntoIterator<Item = Arc<dyn Extension>>,
471        schema_env: SchemaEnv,
472        session_data: Arc<Data>,
473    ) -> Self {
474        Extensions {
475            extensions: extensions.into_iter().collect(),
476            schema_env,
477            session_data,
478            query_data: None,
479        }
480    }
481
482    #[inline]
483    pub(crate) fn attach_query_data(&mut self, data: Arc<Data>) {
484        self.query_data = Some(data);
485    }
486
487    #[inline]
488    pub(crate) fn is_empty(&self) -> bool {
489        self.extensions.is_empty()
490    }
491
492    #[inline]
493    fn create_context(&self) -> ExtensionContext {
494        ExtensionContext {
495            schema_env: &self.schema_env,
496            session_data: &self.session_data,
497            query_data: self.query_data.as_deref(),
498        }
499    }
500
501    pub async fn request(&self, request_fut: RequestFut<'_>) -> Response {
502        let next = NextRequest {
503            chain: &self.extensions,
504            request_fut,
505        };
506        next.run(&self.create_context()).await
507    }
508
509    pub fn subscribe<'s>(&self, stream: BoxStream<'s, Response>) -> BoxStream<'s, Response> {
510        let next = NextSubscribe {
511            chain: &self.extensions,
512        };
513        next.run(&self.create_context(), stream)
514    }
515
516    pub async fn prepare_request(&self, request: Request) -> ServerResult<Request> {
517        let next = NextPrepareRequest {
518            chain: &self.extensions,
519        };
520        next.run(&self.create_context(), request).await
521    }
522
523    pub async fn parse_query(
524        &self,
525        query: &str,
526        variables: &Variables,
527        parse_query_fut: ParseFut<'_>,
528    ) -> ServerResult<ExecutableDocument> {
529        let next = NextParseQuery {
530            chain: &self.extensions,
531            parse_query_fut,
532        };
533        next.run(&self.create_context(), query, variables).await
534    }
535
536    pub async fn validation(
537        &self,
538        validation_fut: ValidationFut<'_>,
539    ) -> Result<ValidationResult, Vec<ServerError>> {
540        let next = NextValidation {
541            chain: &self.extensions,
542            validation_fut,
543        };
544        next.run(&self.create_context()).await
545    }
546
547    pub async fn execute<'a, 'b, F, T>(
548        &'a self,
549        operation_name: Option<&str>,
550        execute_fut_factory: F,
551    ) -> Response
552    where
553        F: FnOnce(Option<Data>) -> T + Send + 'a,
554        T: Future<Output = Response> + Send + 'a,
555    {
556        let next = NextExecute {
557            chain: &self.extensions,
558            execute_fut_factory: Box::new(|data| execute_fut_factory(data).boxed()),
559            execute_data: None,
560        };
561        next.run(&self.create_context(), operation_name).await
562    }
563
564    pub async fn resolve(
565        &self,
566        info: ResolveInfo<'_>,
567        resolve_fut: ResolveFut<'_>,
568    ) -> ServerResult<Option<Value>> {
569        let next = NextResolve {
570            chain: &self.extensions,
571            resolve_fut,
572        };
573        next.run(&self.create_context(), info).await
574    }
575}