tonic/transport/server/
mod.rs

1//! Server implementation and builder.
2
3mod conn;
4mod display_error_stack;
5mod incoming;
6mod io_stream;
7mod service;
8#[cfg(feature = "_tls-any")]
9mod tls;
10#[cfg(unix)]
11mod unix;
12
13use tokio_stream::StreamExt as _;
14use tracing::{debug, trace};
15
16#[cfg(feature = "router")]
17use crate::{server::NamedService, service::Routes};
18
19#[cfg(feature = "router")]
20use std::convert::Infallible;
21
22pub use conn::{Connected, TcpConnectInfo};
23use hyper_util::{
24    rt::{TokioExecutor, TokioIo, TokioTimer},
25    server::conn::auto::{Builder as ConnectionBuilder, HttpServerConnExec},
26    service::TowerToHyperService,
27};
28#[cfg(feature = "_tls-any")]
29pub use tls::ServerTlsConfig;
30
31#[cfg(feature = "_tls-any")]
32pub use conn::TlsConnectInfo;
33
34#[cfg(feature = "_tls-any")]
35use self::service::TlsAcceptor;
36
37#[cfg(unix)]
38pub use unix::UdsConnectInfo;
39
40pub use incoming::TcpIncoming;
41
42#[cfg(feature = "_tls-any")]
43use crate::transport::Error;
44
45use self::service::{ConnectInfoLayer, ServerIo};
46use super::service::GrpcTimeout;
47use crate::body::Body;
48use crate::service::RecoverErrorLayer;
49use crate::transport::server::display_error_stack::DisplayErrorStack;
50use bytes::Bytes;
51use http::{Request, Response};
52use http_body_util::BodyExt;
53use hyper::{body::Incoming, service::Service as HyperService};
54use pin_project::pin_project;
55use std::{
56    fmt,
57    future::{self, Future},
58    marker::PhantomData,
59    net::SocketAddr,
60    pin::{pin, Pin},
61    sync::Arc,
62    task::{ready, Context, Poll},
63    time::Duration,
64};
65use tokio::io::{AsyncRead, AsyncWrite};
66use tokio_stream::Stream;
67use tower::{
68    layer::util::{Identity, Stack},
69    layer::Layer,
70    limit::concurrency::ConcurrencyLimitLayer,
71    load_shed::LoadShedLayer,
72    util::BoxCloneService,
73    Service, ServiceBuilder, ServiceExt,
74};
75
76type BoxService = tower::util::BoxCloneService<Request<Body>, Response<Body>, crate::BoxError>;
77type TraceInterceptor = Arc<dyn Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static>;
78
79const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
80
81/// A default batteries included `transport` server.
82///
83/// This provides an easy builder pattern style builder [`Server`] on top of
84/// `hyper` connections. This builder exposes easy configuration parameters
85/// for providing a fully featured http2 based gRPC server. This should provide
86/// a very good out of the box http2 server for use with tonic but is also a
87/// reference implementation that should be a good starting point for anyone
88/// wanting to create a more complex and/or specific implementation.
89#[derive(Clone)]
90pub struct Server<L = Identity> {
91    trace_interceptor: Option<TraceInterceptor>,
92    concurrency_limit: Option<usize>,
93    load_shed: bool,
94    timeout: Option<Duration>,
95    #[cfg(feature = "_tls-any")]
96    tls: Option<TlsAcceptor>,
97    init_stream_window_size: Option<u32>,
98    init_connection_window_size: Option<u32>,
99    max_concurrent_streams: Option<u32>,
100    tcp_keepalive: Option<Duration>,
101    tcp_nodelay: bool,
102    http2_keepalive_interval: Option<Duration>,
103    http2_keepalive_timeout: Duration,
104    http2_adaptive_window: Option<bool>,
105    http2_max_pending_accept_reset_streams: Option<usize>,
106    http2_max_header_list_size: Option<u32>,
107    max_frame_size: Option<u32>,
108    accept_http1: bool,
109    service_builder: ServiceBuilder<L>,
110    max_connection_age: Option<Duration>,
111}
112
113impl Default for Server<Identity> {
114    fn default() -> Self {
115        Self {
116            trace_interceptor: None,
117            concurrency_limit: None,
118            load_shed: false,
119            timeout: None,
120            #[cfg(feature = "_tls-any")]
121            tls: None,
122            init_stream_window_size: None,
123            init_connection_window_size: None,
124            max_concurrent_streams: None,
125            tcp_keepalive: None,
126            tcp_nodelay: false,
127            http2_keepalive_interval: None,
128            http2_keepalive_timeout: DEFAULT_HTTP2_KEEPALIVE_TIMEOUT,
129            http2_adaptive_window: None,
130            http2_max_pending_accept_reset_streams: None,
131            http2_max_header_list_size: None,
132            max_frame_size: None,
133            accept_http1: false,
134            service_builder: Default::default(),
135            max_connection_age: None,
136        }
137    }
138}
139
140/// A stack based [`Service`] router.
141#[cfg(feature = "router")]
142#[derive(Debug)]
143pub struct Router<L = Identity> {
144    server: Server<L>,
145    routes: Routes,
146}
147
148impl Server {
149    /// Create a new server builder that can configure a [`Server`].
150    pub fn builder() -> Self {
151        Server {
152            tcp_nodelay: true,
153            accept_http1: false,
154            ..Default::default()
155        }
156    }
157}
158
159impl<L> Server<L> {
160    /// Configure TLS for this server.
161    #[cfg(feature = "_tls-any")]
162    pub fn tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error> {
163        Ok(Server {
164            tls: Some(tls_config.tls_acceptor().map_err(Error::from_source)?),
165            ..self
166        })
167    }
168
169    /// Set the concurrency limit applied to on requests inbound per connection.
170    ///
171    /// # Example
172    ///
173    /// ```
174    /// # use tonic::transport::Server;
175    /// # use tower_service::Service;
176    /// # let builder = Server::builder();
177    /// builder.concurrency_limit_per_connection(32);
178    /// ```
179    #[must_use]
180    pub fn concurrency_limit_per_connection(self, limit: usize) -> Self {
181        Server {
182            concurrency_limit: Some(limit),
183            ..self
184        }
185    }
186
187    /// Enable or disable load shedding. The default is disabled.
188    ///
189    /// When load shedding is enabled, if the service responds with not ready
190    /// the request will immediately be rejected with a
191    /// [`resource_exhausted`](https://docs.rs/tonic/latest/tonic/struct.Status.html#method.resource_exhausted) error.
192    /// The default is to buffer requests. This is especially useful in combination with
193    /// setting a concurrency limit per connection.
194    ///
195    /// # Example
196    ///
197    /// ```
198    /// # use tonic::transport::Server;
199    /// # use tower_service::Service;
200    /// # let builder = Server::builder();
201    /// builder.load_shed(true);
202    /// ```
203    #[must_use]
204    pub fn load_shed(self, load_shed: bool) -> Self {
205        Server { load_shed, ..self }
206    }
207
208    /// Set a timeout on for all request handlers.
209    ///
210    /// # Example
211    ///
212    /// ```
213    /// # use tonic::transport::Server;
214    /// # use tower_service::Service;
215    /// # use std::time::Duration;
216    /// # let builder = Server::builder();
217    /// builder.timeout(Duration::from_secs(30));
218    /// ```
219    #[must_use]
220    pub fn timeout(self, timeout: Duration) -> Self {
221        Server {
222            timeout: Some(timeout),
223            ..self
224        }
225    }
226
227    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
228    /// stream-level flow control.
229    ///
230    /// Default is 65,535
231    ///
232    /// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize
233    #[must_use]
234    pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
235        Server {
236            init_stream_window_size: sz.into(),
237            ..self
238        }
239    }
240
241    /// Sets the max connection-level flow control for HTTP2
242    ///
243    /// Default is 65,535
244    #[must_use]
245    pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
246        Server {
247            init_connection_window_size: sz.into(),
248            ..self
249        }
250    }
251
252    /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
253    /// connections.
254    ///
255    /// Default is no limit (`None`).
256    ///
257    /// [spec]: https://httpwg.org/specs/rfc9113.html#n-stream-concurrency
258    #[must_use]
259    pub fn max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self {
260        Server {
261            max_concurrent_streams: max.into(),
262            ..self
263        }
264    }
265
266    /// Sets the maximum time option in milliseconds that a connection may exist
267    ///
268    /// Default is no limit (`None`).
269    ///
270    /// # Example
271    ///
272    /// ```
273    /// # use tonic::transport::Server;
274    /// # use tower_service::Service;
275    /// # use std::time::Duration;
276    /// # let builder = Server::builder();
277    /// builder.max_connection_age(Duration::from_secs(60));
278    /// ```
279    #[must_use]
280    pub fn max_connection_age(self, max_connection_age: Duration) -> Self {
281        Server {
282            max_connection_age: Some(max_connection_age),
283            ..self
284        }
285    }
286
287    /// Set whether HTTP2 Ping frames are enabled on accepted connections.
288    ///
289    /// If `None` is specified, HTTP2 keepalive is disabled, otherwise the duration
290    /// specified will be the time interval between HTTP2 Ping frames.
291    /// The timeout for receiving an acknowledgement of the keepalive ping
292    /// can be set with [`Server::http2_keepalive_timeout`].
293    ///
294    /// Default is no HTTP2 keepalive (`None`)
295    ///
296    #[must_use]
297    pub fn http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self {
298        Server {
299            http2_keepalive_interval,
300            ..self
301        }
302    }
303
304    /// Sets a timeout for receiving an acknowledgement of the keepalive ping.
305    ///
306    /// If the ping is not acknowledged within the timeout, the connection will be closed.
307    /// Does nothing if http2_keep_alive_interval is disabled.
308    ///
309    /// Default is 20 seconds.
310    ///
311    #[must_use]
312    pub fn http2_keepalive_timeout(mut self, http2_keepalive_timeout: Option<Duration>) -> Self {
313        if let Some(timeout) = http2_keepalive_timeout {
314            self.http2_keepalive_timeout = timeout;
315        }
316        self
317    }
318
319    /// Sets whether to use an adaptive flow control. Defaults to false.
320    /// Enabling this will override the limits set in http2_initial_stream_window_size and
321    /// http2_initial_connection_window_size.
322    #[must_use]
323    pub fn http2_adaptive_window(self, enabled: Option<bool>) -> Self {
324        Server {
325            http2_adaptive_window: enabled,
326            ..self
327        }
328    }
329
330    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
331    ///
332    /// This will default to whatever the default in h2 is. As of v0.3.17, it is 20.
333    ///
334    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
335    #[must_use]
336    pub fn http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self {
337        Server {
338            http2_max_pending_accept_reset_streams: max,
339            ..self
340        }
341    }
342
343    /// Set whether TCP keepalive messages are enabled on accepted connections.
344    ///
345    /// If `None` is specified, keepalive is disabled, otherwise the duration
346    /// specified will be the time to remain idle before sending TCP keepalive
347    /// probes.
348    ///
349    /// Important: This setting is only respected when not using `serve_with_incoming`.
350    ///
351    /// Default is no keepalive (`None`)
352    ///
353    #[must_use]
354    pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
355        Server {
356            tcp_keepalive,
357            ..self
358        }
359    }
360
361    /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
362    #[must_use]
363    pub fn tcp_nodelay(self, enabled: bool) -> Self {
364        Server {
365            tcp_nodelay: enabled,
366            ..self
367        }
368    }
369
370    /// Sets the max size of received header frames.
371    ///
372    /// This will default to whatever the default in hyper is. As of v1.4.1, it is 16 KiB.
373    #[must_use]
374    pub fn http2_max_header_list_size(self, max: impl Into<Option<u32>>) -> Self {
375        Server {
376            http2_max_header_list_size: max.into(),
377            ..self
378        }
379    }
380
381    /// Sets the maximum frame size to use for HTTP2.
382    ///
383    /// Passing `None` will do nothing.
384    ///
385    /// If not set, will default from underlying transport.
386    #[must_use]
387    pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self {
388        Server {
389            max_frame_size: frame_size.into(),
390            ..self
391        }
392    }
393
394    /// Allow this server to accept http1 requests.
395    ///
396    /// Accepting http1 requests is only useful when developing `grpc-web`
397    /// enabled services. If this setting is set to `true` but services are
398    /// not correctly configured to handle grpc-web requests, your server may
399    /// return confusing (but correct) protocol errors.
400    ///
401    /// Default is `false`.
402    #[must_use]
403    pub fn accept_http1(self, accept_http1: bool) -> Self {
404        Server {
405            accept_http1,
406            ..self
407        }
408    }
409
410    /// Intercept inbound headers and add a [`tracing::Span`] to each response future.
411    #[must_use]
412    pub fn trace_fn<F>(self, f: F) -> Self
413    where
414        F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,
415    {
416        Server {
417            trace_interceptor: Some(Arc::new(f)),
418            ..self
419        }
420    }
421
422    /// Create a router with the `S` typed service as the first service.
423    ///
424    /// This will clone the `Server` builder and create a router that will
425    /// route around different services.
426    #[cfg(feature = "router")]
427    pub fn add_service<S>(&mut self, svc: S) -> Router<L>
428    where
429        S: Service<Request<Body>, Error = Infallible>
430            + NamedService
431            + Clone
432            + Send
433            + Sync
434            + 'static,
435        S::Response: axum::response::IntoResponse,
436        S::Future: Send + 'static,
437        L: Clone,
438    {
439        Router::new(self.clone(), Routes::new(svc))
440    }
441
442    /// Create a router with the optional `S` typed service as the first service.
443    ///
444    /// This will clone the `Server` builder and create a router that will
445    /// route around different services.
446    ///
447    /// # Note
448    /// Even when the argument given is `None` this will capture *all* requests to this service name.
449    /// As a result, one cannot use this to toggle between two identically named implementations.
450    #[cfg(feature = "router")]
451    pub fn add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L>
452    where
453        S: Service<Request<Body>, Error = Infallible>
454            + NamedService
455            + Clone
456            + Send
457            + Sync
458            + 'static,
459        S::Response: axum::response::IntoResponse,
460        S::Future: Send + 'static,
461        L: Clone,
462    {
463        let routes = svc.map(Routes::new).unwrap_or_default();
464        Router::new(self.clone(), routes)
465    }
466
467    /// Create a router with given [`Routes`].
468    ///
469    /// This will clone the `Server` builder and create a router that will
470    /// route around different services that were already added to the provided `routes`.
471    #[cfg(feature = "router")]
472    pub fn add_routes(&mut self, routes: Routes) -> Router<L>
473    where
474        L: Clone,
475    {
476        Router::new(self.clone(), routes)
477    }
478
479    /// Set the [Tower] [`Layer`] all services will be wrapped in.
480    ///
481    /// This enables using middleware from the [Tower ecosystem][eco].
482    ///
483    /// # Example
484    ///
485    /// ```
486    /// # use tonic::transport::Server;
487    /// # use tower_service::Service;
488    /// use tower::timeout::TimeoutLayer;
489    /// use std::time::Duration;
490    ///
491    /// # let mut builder = Server::builder();
492    /// builder.layer(TimeoutLayer::new(Duration::from_secs(30)));
493    /// ```
494    ///
495    /// Note that timeouts should be set using [`Server::timeout`]. `TimeoutLayer` is only used
496    /// here as an example.
497    ///
498    /// You can build more complex layers using [`ServiceBuilder`]. Those layers can include
499    /// [interceptors]:
500    ///
501    /// ```
502    /// # use tonic::transport::Server;
503    /// # use tower_service::Service;
504    /// use tower::ServiceBuilder;
505    /// use std::time::Duration;
506    /// use tonic::{Request, Status, service::InterceptorLayer};
507    ///
508    /// fn auth_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
509    ///     if valid_credentials(&request) {
510    ///         Ok(request)
511    ///     } else {
512    ///         Err(Status::unauthenticated("invalid credentials"))
513    ///     }
514    /// }
515    ///
516    /// fn valid_credentials(request: &Request<()>) -> bool {
517    ///     // ...
518    ///     # true
519    /// }
520    ///
521    /// fn some_other_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
522    ///     Ok(request)
523    /// }
524    ///
525    /// let layer = ServiceBuilder::new()
526    ///     .load_shed()
527    ///     .timeout(Duration::from_secs(30))
528    ///     .layer(InterceptorLayer::new(auth_interceptor))
529    ///     .layer(InterceptorLayer::new(some_other_interceptor))
530    ///     .into_inner();
531    ///
532    /// Server::builder().layer(layer);
533    /// ```
534    ///
535    /// [Tower]: https://github.com/tower-rs/tower
536    /// [`Layer`]: tower::layer::Layer
537    /// [eco]: https://github.com/tower-rs
538    /// [`ServiceBuilder`]: tower::ServiceBuilder
539    /// [interceptors]: crate::service::Interceptor
540    pub fn layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>> {
541        Server {
542            service_builder: self.service_builder.layer(new_layer),
543            trace_interceptor: self.trace_interceptor,
544            concurrency_limit: self.concurrency_limit,
545            load_shed: self.load_shed,
546            timeout: self.timeout,
547            #[cfg(feature = "_tls-any")]
548            tls: self.tls,
549            init_stream_window_size: self.init_stream_window_size,
550            init_connection_window_size: self.init_connection_window_size,
551            max_concurrent_streams: self.max_concurrent_streams,
552            tcp_keepalive: self.tcp_keepalive,
553            tcp_nodelay: self.tcp_nodelay,
554            http2_keepalive_interval: self.http2_keepalive_interval,
555            http2_keepalive_timeout: self.http2_keepalive_timeout,
556            http2_adaptive_window: self.http2_adaptive_window,
557            http2_max_pending_accept_reset_streams: self.http2_max_pending_accept_reset_streams,
558            http2_max_header_list_size: self.http2_max_header_list_size,
559            max_frame_size: self.max_frame_size,
560            accept_http1: self.accept_http1,
561            max_connection_age: self.max_connection_age,
562        }
563    }
564
565    fn bind_incoming(&self, addr: SocketAddr) -> Result<TcpIncoming, super::Error> {
566        Ok(TcpIncoming::bind(addr)
567            .map_err(super::Error::from_source)?
568            .with_nodelay(Some(self.tcp_nodelay))
569            .with_keepalive(self.tcp_keepalive))
570    }
571
572    /// Serve the service.
573    pub async fn serve<S, ResBody>(self, addr: SocketAddr, svc: S) -> Result<(), super::Error>
574    where
575        L: Layer<S>,
576        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
577        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
578        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
579            Into<crate::BoxError> + Send + 'static,
580        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
581        ResBody::Error: Into<crate::BoxError>,
582    {
583        let incoming = self.bind_incoming(addr)?;
584        self.serve_with_incoming(svc, incoming).await
585    }
586
587    /// Serve the service with the shutdown signal.
588    pub async fn serve_with_shutdown<S, F, ResBody>(
589        self,
590        addr: SocketAddr,
591        svc: S,
592        signal: F,
593    ) -> Result<(), super::Error>
594    where
595        L: Layer<S>,
596        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
597        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
598        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
599            Into<crate::BoxError> + Send + 'static,
600        F: Future<Output = ()>,
601        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
602        ResBody::Error: Into<crate::BoxError>,
603    {
604        let incoming = self.bind_incoming(addr)?;
605        self.serve_with_incoming_shutdown(svc, incoming, signal)
606            .await
607    }
608
609    /// Serve the service on the provided incoming stream.
610    pub async fn serve_with_incoming<S, I, IO, IE, ResBody>(
611        self,
612        svc: S,
613        incoming: I,
614    ) -> Result<(), super::Error>
615    where
616        L: Layer<S>,
617        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
618        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
619        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
620            Into<crate::BoxError> + Send + 'static,
621        I: Stream<Item = Result<IO, IE>>,
622        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
623        IE: Into<crate::BoxError>,
624        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
625        ResBody::Error: Into<crate::BoxError>,
626    {
627        self.serve_internal(svc, incoming, Option::<future::Ready<()>>::None)
628            .await
629    }
630
631    /// Serve the service with the signal on the provided incoming stream.
632    pub async fn serve_with_incoming_shutdown<S, I, F, IO, IE, ResBody>(
633        self,
634        svc: S,
635        incoming: I,
636        signal: F,
637    ) -> Result<(), super::Error>
638    where
639        L: Layer<S>,
640        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
641        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
642        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
643            Into<crate::BoxError> + Send + 'static,
644        I: Stream<Item = Result<IO, IE>>,
645        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
646        IE: Into<crate::BoxError>,
647        F: Future<Output = ()>,
648        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
649        ResBody::Error: Into<crate::BoxError>,
650    {
651        self.serve_internal(svc, incoming, Some(signal)).await
652    }
653
654    async fn serve_internal<S, I, F, IO, IE, ResBody>(
655        self,
656        svc: S,
657        incoming: I,
658        signal: Option<F>,
659    ) -> Result<(), super::Error>
660    where
661        L: Layer<S>,
662        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
663        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
664        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
665            Into<crate::BoxError> + Send + 'static,
666        I: Stream<Item = Result<IO, IE>>,
667        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
668        IE: Into<crate::BoxError>,
669        F: Future<Output = ()>,
670        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
671        ResBody::Error: Into<crate::BoxError>,
672    {
673        let trace_interceptor = self.trace_interceptor.clone();
674        let concurrency_limit = self.concurrency_limit;
675        let load_shed = self.load_shed;
676        let init_connection_window_size = self.init_connection_window_size;
677        let init_stream_window_size = self.init_stream_window_size;
678        let max_concurrent_streams = self.max_concurrent_streams;
679        let timeout = self.timeout;
680        let max_header_list_size = self.http2_max_header_list_size;
681        let max_frame_size = self.max_frame_size;
682        let http2_only = !self.accept_http1;
683
684        let http2_keepalive_interval = self.http2_keepalive_interval;
685        let http2_keepalive_timeout = self.http2_keepalive_timeout;
686        let http2_adaptive_window = self.http2_adaptive_window;
687        let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
688        let max_connection_age = self.max_connection_age;
689
690        let svc = self.service_builder.service(svc);
691
692        let incoming = io_stream::ServerIoStream::new(
693            incoming,
694            #[cfg(feature = "_tls-any")]
695            self.tls,
696        );
697        let mut svc = MakeSvc {
698            inner: svc,
699            concurrency_limit,
700            load_shed,
701            timeout,
702            trace_interceptor,
703            _io: PhantomData,
704        };
705
706        let server = {
707            let mut builder = ConnectionBuilder::new(TokioExecutor::new());
708
709            if http2_only {
710                builder = builder.http2_only();
711            }
712
713            builder
714                .http2()
715                .timer(TokioTimer::new())
716                .initial_connection_window_size(init_connection_window_size)
717                .initial_stream_window_size(init_stream_window_size)
718                .max_concurrent_streams(max_concurrent_streams)
719                .keep_alive_interval(http2_keepalive_interval)
720                .keep_alive_timeout(http2_keepalive_timeout)
721                .adaptive_window(http2_adaptive_window.unwrap_or_default())
722                .max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
723                .max_frame_size(max_frame_size);
724
725            if let Some(max_header_list_size) = max_header_list_size {
726                builder.http2().max_header_list_size(max_header_list_size);
727            }
728
729            builder
730        };
731
732        let (signal_tx, signal_rx) = tokio::sync::watch::channel(());
733        let signal_tx = Arc::new(signal_tx);
734
735        let graceful = signal.is_some();
736        let mut sig = pin!(Fuse { inner: signal });
737        let mut incoming = pin!(incoming);
738
739        loop {
740            tokio::select! {
741                _ = &mut sig => {
742                    trace!("signal received, shutting down");
743                    break;
744                },
745                io = incoming.next() => {
746                    let io = match io {
747                        Some(Ok(io)) => io,
748                        Some(Err(e)) => {
749                            trace!("error accepting connection: {}", DisplayErrorStack(&*e));
750                            continue;
751                        },
752                        None => {
753                            break
754                        },
755                    };
756
757                    trace!("connection accepted");
758
759                    let req_svc = svc
760                        .call(&io)
761                        .await
762                        .map_err(super::Error::from_source)?;
763
764                    let hyper_io = TokioIo::new(io);
765                    let hyper_svc = TowerToHyperService::new(req_svc.map_request(|req: Request<Incoming>| req.map(Body::new)));
766
767                    serve_connection(hyper_io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone()), max_connection_age);
768                }
769            }
770        }
771
772        if graceful {
773            let _ = signal_tx.send(());
774            drop(signal_rx);
775            trace!(
776                "waiting for {} connections to close",
777                signal_tx.receiver_count()
778            );
779
780            // Wait for all connections to close
781            signal_tx.closed().await;
782        }
783
784        Ok(())
785    }
786}
787
788// This is moved to its own function as a way to get around
789// https://github.com/rust-lang/rust/issues/102211
790fn serve_connection<B, IO, S, E>(
791    hyper_io: IO,
792    hyper_svc: S,
793    builder: ConnectionBuilder<E>,
794    mut watcher: Option<tokio::sync::watch::Receiver<()>>,
795    max_connection_age: Option<Duration>,
796) where
797    B: http_body::Body + Send + 'static,
798    B::Data: Send,
799    B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
800    IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
801    S: HyperService<Request<Incoming>, Response = Response<B>> + Clone + Send + 'static,
802    S::Future: Send + 'static,
803    S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
804    E: HttpServerConnExec<S::Future, B> + Send + Sync + 'static,
805{
806    tokio::spawn(async move {
807        {
808            let mut sig = pin!(Fuse {
809                inner: watcher.as_mut().map(|w| w.changed()),
810            });
811
812            let mut conn = pin!(builder.serve_connection(hyper_io, hyper_svc));
813
814            let mut sleep = pin!(sleep_or_pending(max_connection_age));
815
816            loop {
817                tokio::select! {
818                    rv = &mut conn => {
819                        if let Err(err) = rv {
820                            debug!("failed serving connection: {}", DisplayErrorStack(&*err));
821                        }
822                        break;
823                    },
824                    _ = &mut sleep  => {
825                        conn.as_mut().graceful_shutdown();
826                        sleep.set(sleep_or_pending(None));
827                    },
828                    _ = &mut sig => {
829                        conn.as_mut().graceful_shutdown();
830                    }
831                }
832            }
833        }
834
835        drop(watcher);
836        trace!("connection closed");
837    });
838}
839
840async fn sleep_or_pending(wait_for: Option<Duration>) {
841    match wait_for {
842        Some(wait) => tokio::time::sleep(wait).await,
843        None => future::pending().await,
844    };
845}
846
847#[cfg(feature = "router")]
848impl<L> Router<L> {
849    pub(crate) fn new(server: Server<L>, routes: Routes) -> Self {
850        Self { server, routes }
851    }
852}
853
854#[cfg(feature = "router")]
855impl<L> Router<L> {
856    /// Add a new service to this router.
857    pub fn add_service<S>(mut self, svc: S) -> Self
858    where
859        S: Service<Request<Body>, Error = Infallible>
860            + NamedService
861            + Clone
862            + Send
863            + Sync
864            + 'static,
865        S::Response: axum::response::IntoResponse,
866        S::Future: Send + 'static,
867    {
868        self.routes = self.routes.add_service(svc);
869        self
870    }
871
872    /// Add a new optional service to this router.
873    ///
874    /// # Note
875    /// Even when the argument given is `None` this will capture *all* requests to this service name.
876    /// As a result, one cannot use this to toggle between two identically named implementations.
877    pub fn add_optional_service<S>(mut self, svc: Option<S>) -> Self
878    where
879        S: Service<Request<Body>, Error = Infallible>
880            + NamedService
881            + Clone
882            + Send
883            + Sync
884            + 'static,
885        S::Response: axum::response::IntoResponse,
886        S::Future: Send + 'static,
887    {
888        if let Some(svc) = svc {
889            self.routes = self.routes.add_service(svc);
890        }
891        self
892    }
893
894    /// Consume this [`Server`] creating a future that will execute the server
895    /// on [tokio]'s default executor.
896    ///
897    /// [`Server`]: struct.Server.html
898    /// [tokio]: https://docs.rs/tokio
899    pub async fn serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error>
900    where
901        L: Layer<Routes> + Clone,
902        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
903        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
904        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
905            Into<crate::BoxError> + Send,
906        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
907        ResBody::Error: Into<crate::BoxError>,
908    {
909        self.server.serve(addr, self.routes.prepare()).await
910    }
911
912    /// Consume this [`Server`] creating a future that will execute the server
913    /// on [tokio]'s default executor. And shutdown when the provided signal
914    /// is received.
915    ///
916    /// [`Server`]: struct.Server.html
917    /// [tokio]: https://docs.rs/tokio
918    pub async fn serve_with_shutdown<F: Future<Output = ()>, ResBody>(
919        self,
920        addr: SocketAddr,
921        signal: F,
922    ) -> Result<(), super::Error>
923    where
924        L: Layer<Routes>,
925        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
926        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
927        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
928            Into<crate::BoxError> + Send,
929        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
930        ResBody::Error: Into<crate::BoxError>,
931    {
932        self.server
933            .serve_with_shutdown(addr, self.routes.prepare(), signal)
934            .await
935    }
936
937    /// Consume this [`Server`] creating a future that will execute the server
938    /// on the provided incoming stream of `AsyncRead + AsyncWrite`.
939    ///
940    /// This method discards any provided [`Server`] TCP configuration.
941    ///
942    /// [`Server`]: struct.Server.html
943    pub async fn serve_with_incoming<I, IO, IE, ResBody>(
944        self,
945        incoming: I,
946    ) -> Result<(), super::Error>
947    where
948        I: Stream<Item = Result<IO, IE>>,
949        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
950        IE: Into<crate::BoxError>,
951        L: Layer<Routes>,
952
953        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
954        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
955        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
956            Into<crate::BoxError> + Send,
957        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
958        ResBody::Error: Into<crate::BoxError>,
959    {
960        self.server
961            .serve_with_incoming(self.routes.prepare(), incoming)
962            .await
963    }
964
965    /// Consume this [`Server`] creating a future that will execute the server
966    /// on the provided incoming stream of `AsyncRead + AsyncWrite`. Similar to
967    /// `serve_with_shutdown` this method will also take a signal future to
968    /// gracefully shutdown the server.
969    ///
970    /// This method discards any provided [`Server`] TCP configuration.
971    ///
972    /// [`Server`]: struct.Server.html
973    pub async fn serve_with_incoming_shutdown<I, IO, IE, F, ResBody>(
974        self,
975        incoming: I,
976        signal: F,
977    ) -> Result<(), super::Error>
978    where
979        I: Stream<Item = Result<IO, IE>>,
980        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
981        IE: Into<crate::BoxError>,
982        F: Future<Output = ()>,
983        L: Layer<Routes>,
984        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
985        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
986        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
987            Into<crate::BoxError> + Send,
988        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
989        ResBody::Error: Into<crate::BoxError>,
990    {
991        self.server
992            .serve_with_incoming_shutdown(self.routes.prepare(), incoming, signal)
993            .await
994    }
995}
996
997impl<L> fmt::Debug for Server<L> {
998    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
999        f.debug_struct("Builder").finish()
1000    }
1001}
1002
1003#[derive(Clone)]
1004struct Svc<S> {
1005    inner: S,
1006    trace_interceptor: Option<TraceInterceptor>,
1007}
1008
1009impl<S, ResBody> Service<Request<Body>> for Svc<S>
1010where
1011    S: Service<Request<Body>, Response = Response<ResBody>>,
1012    S::Error: Into<crate::BoxError>,
1013    ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1014    ResBody::Error: Into<crate::BoxError>,
1015{
1016    type Response = Response<Body>;
1017    type Error = crate::BoxError;
1018    type Future = SvcFuture<S::Future>;
1019
1020    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1021        self.inner.poll_ready(cx).map_err(Into::into)
1022    }
1023
1024    fn call(&mut self, mut req: Request<Body>) -> Self::Future {
1025        let span = if let Some(trace_interceptor) = &self.trace_interceptor {
1026            let (parts, body) = req.into_parts();
1027            let bodyless_request = Request::from_parts(parts, ());
1028
1029            let span = trace_interceptor(&bodyless_request);
1030
1031            let (parts, _) = bodyless_request.into_parts();
1032            req = Request::from_parts(parts, body);
1033
1034            span
1035        } else {
1036            tracing::Span::none()
1037        };
1038
1039        SvcFuture {
1040            inner: self.inner.call(req),
1041            span,
1042        }
1043    }
1044}
1045
1046#[pin_project]
1047struct SvcFuture<F> {
1048    #[pin]
1049    inner: F,
1050    span: tracing::Span,
1051}
1052
1053impl<F, E, ResBody> Future for SvcFuture<F>
1054where
1055    F: Future<Output = Result<Response<ResBody>, E>>,
1056    E: Into<crate::BoxError>,
1057    ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1058    ResBody::Error: Into<crate::BoxError>,
1059{
1060    type Output = Result<Response<Body>, crate::BoxError>;
1061
1062    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1063        let this = self.project();
1064        let _guard = this.span.enter();
1065
1066        let response: Response<ResBody> = ready!(this.inner.poll(cx)).map_err(Into::into)?;
1067        let response = response.map(|body| Body::new(body.map_err(Into::into)));
1068        Poll::Ready(Ok(response))
1069    }
1070}
1071
1072impl<S> fmt::Debug for Svc<S> {
1073    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1074        f.debug_struct("Svc").finish()
1075    }
1076}
1077
1078#[derive(Clone)]
1079struct MakeSvc<S, IO> {
1080    concurrency_limit: Option<usize>,
1081    load_shed: bool,
1082    timeout: Option<Duration>,
1083    inner: S,
1084    trace_interceptor: Option<TraceInterceptor>,
1085    _io: PhantomData<fn() -> IO>,
1086}
1087
1088impl<S, ResBody, IO> Service<&ServerIo<IO>> for MakeSvc<S, IO>
1089where
1090    IO: Connected + 'static,
1091    S: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
1092    S::Future: Send,
1093    S::Error: Into<crate::BoxError> + Send,
1094    ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1095    ResBody::Error: Into<crate::BoxError>,
1096{
1097    type Response = BoxService;
1098    type Error = crate::BoxError;
1099    type Future = future::Ready<Result<Self::Response, Self::Error>>;
1100
1101    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1102        Ok(()).into()
1103    }
1104
1105    fn call(&mut self, io: &ServerIo<IO>) -> Self::Future {
1106        let conn_info = io.connect_info();
1107
1108        let svc = self.inner.clone();
1109        let concurrency_limit = self.concurrency_limit;
1110        let timeout = self.timeout;
1111        let trace_interceptor = self.trace_interceptor.clone();
1112
1113        let svc = ServiceBuilder::new()
1114            .layer(RecoverErrorLayer::new())
1115            .option_layer(self.load_shed.then_some(LoadShedLayer::new()))
1116            .option_layer(concurrency_limit.map(ConcurrencyLimitLayer::new))
1117            .layer_fn(|s| GrpcTimeout::new(s, timeout))
1118            .service(svc);
1119
1120        let svc = ServiceBuilder::new()
1121            .layer(BoxCloneService::layer())
1122            .layer(ConnectInfoLayer::new(conn_info.clone()))
1123            .service(Svc {
1124                inner: svc,
1125                trace_interceptor,
1126            });
1127
1128        future::ready(Ok(svc))
1129    }
1130}
1131
1132// From `futures-util` crate, borrowed since this is the only dependency tonic requires.
1133// LICENSE: MIT or Apache-2.0
1134// A future which only yields `Poll::Ready` once, and thereafter yields `Poll::Pending`.
1135#[pin_project]
1136struct Fuse<F> {
1137    #[pin]
1138    inner: Option<F>,
1139}
1140
1141impl<F> Future for Fuse<F>
1142where
1143    F: Future,
1144{
1145    type Output = F::Output;
1146
1147    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1148        match self.as_mut().project().inner.as_pin_mut() {
1149            Some(fut) => fut.poll(cx).map(|output| {
1150                self.project().inner.set(None);
1151                output
1152            }),
1153            None => Poll::Pending,
1154        }
1155    }
1156}