1mod conn;
4mod display_error_stack;
5mod incoming;
6mod io_stream;
7mod service;
8#[cfg(feature = "_tls-any")]
9mod tls;
10#[cfg(unix)]
11mod unix;
12
13use tokio_stream::StreamExt as _;
14use tracing::{debug, trace};
15
16#[cfg(feature = "router")]
17use crate::{server::NamedService, service::Routes};
18
19#[cfg(feature = "router")]
20use std::convert::Infallible;
21
22pub use conn::{Connected, TcpConnectInfo};
23use hyper_util::{
24 rt::{TokioExecutor, TokioIo, TokioTimer},
25 server::conn::auto::{Builder as ConnectionBuilder, HttpServerConnExec},
26 service::TowerToHyperService,
27};
28#[cfg(feature = "_tls-any")]
29pub use tls::ServerTlsConfig;
30
31#[cfg(feature = "_tls-any")]
32pub use conn::TlsConnectInfo;
33
34#[cfg(feature = "_tls-any")]
35use self::service::TlsAcceptor;
36
37#[cfg(unix)]
38pub use unix::UdsConnectInfo;
39
40pub use incoming::TcpIncoming;
41
42#[cfg(feature = "_tls-any")]
43use crate::transport::Error;
44
45use self::service::{ConnectInfoLayer, ServerIo};
46use super::service::GrpcTimeout;
47use crate::body::Body;
48use crate::service::RecoverErrorLayer;
49use crate::transport::server::display_error_stack::DisplayErrorStack;
50use bytes::Bytes;
51use http::{Request, Response};
52use http_body_util::BodyExt;
53use hyper::{body::Incoming, service::Service as HyperService};
54use pin_project::pin_project;
55use std::{
56 fmt,
57 future::{self, Future},
58 marker::PhantomData,
59 net::SocketAddr,
60 pin::{pin, Pin},
61 sync::Arc,
62 task::{ready, Context, Poll},
63 time::Duration,
64};
65use tokio::io::{AsyncRead, AsyncWrite};
66use tokio_stream::Stream;
67use tower::{
68 layer::util::{Identity, Stack},
69 layer::Layer,
70 limit::concurrency::ConcurrencyLimitLayer,
71 load_shed::LoadShedLayer,
72 util::BoxCloneService,
73 Service, ServiceBuilder, ServiceExt,
74};
75
76type BoxService = tower::util::BoxCloneService<Request<Body>, Response<Body>, crate::BoxError>;
77type TraceInterceptor = Arc<dyn Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static>;
78
79const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
80
81#[derive(Clone)]
90pub struct Server<L = Identity> {
91 trace_interceptor: Option<TraceInterceptor>,
92 concurrency_limit: Option<usize>,
93 load_shed: bool,
94 timeout: Option<Duration>,
95 #[cfg(feature = "_tls-any")]
96 tls: Option<TlsAcceptor>,
97 init_stream_window_size: Option<u32>,
98 init_connection_window_size: Option<u32>,
99 max_concurrent_streams: Option<u32>,
100 tcp_keepalive: Option<Duration>,
101 tcp_nodelay: bool,
102 http2_keepalive_interval: Option<Duration>,
103 http2_keepalive_timeout: Duration,
104 http2_adaptive_window: Option<bool>,
105 http2_max_pending_accept_reset_streams: Option<usize>,
106 http2_max_header_list_size: Option<u32>,
107 max_frame_size: Option<u32>,
108 accept_http1: bool,
109 service_builder: ServiceBuilder<L>,
110 max_connection_age: Option<Duration>,
111}
112
113impl Default for Server<Identity> {
114 fn default() -> Self {
115 Self {
116 trace_interceptor: None,
117 concurrency_limit: None,
118 load_shed: false,
119 timeout: None,
120 #[cfg(feature = "_tls-any")]
121 tls: None,
122 init_stream_window_size: None,
123 init_connection_window_size: None,
124 max_concurrent_streams: None,
125 tcp_keepalive: None,
126 tcp_nodelay: false,
127 http2_keepalive_interval: None,
128 http2_keepalive_timeout: DEFAULT_HTTP2_KEEPALIVE_TIMEOUT,
129 http2_adaptive_window: None,
130 http2_max_pending_accept_reset_streams: None,
131 http2_max_header_list_size: None,
132 max_frame_size: None,
133 accept_http1: false,
134 service_builder: Default::default(),
135 max_connection_age: None,
136 }
137 }
138}
139
140#[cfg(feature = "router")]
142#[derive(Debug)]
143pub struct Router<L = Identity> {
144 server: Server<L>,
145 routes: Routes,
146}
147
148impl Server {
149 pub fn builder() -> Self {
151 Server {
152 tcp_nodelay: true,
153 accept_http1: false,
154 ..Default::default()
155 }
156 }
157}
158
159impl<L> Server<L> {
160 #[cfg(feature = "_tls-any")]
162 pub fn tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error> {
163 Ok(Server {
164 tls: Some(tls_config.tls_acceptor().map_err(Error::from_source)?),
165 ..self
166 })
167 }
168
169 #[must_use]
180 pub fn concurrency_limit_per_connection(self, limit: usize) -> Self {
181 Server {
182 concurrency_limit: Some(limit),
183 ..self
184 }
185 }
186
187 #[must_use]
204 pub fn load_shed(self, load_shed: bool) -> Self {
205 Server { load_shed, ..self }
206 }
207
208 #[must_use]
220 pub fn timeout(self, timeout: Duration) -> Self {
221 Server {
222 timeout: Some(timeout),
223 ..self
224 }
225 }
226
227 #[must_use]
234 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
235 Server {
236 init_stream_window_size: sz.into(),
237 ..self
238 }
239 }
240
241 #[must_use]
245 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
246 Server {
247 init_connection_window_size: sz.into(),
248 ..self
249 }
250 }
251
252 #[must_use]
259 pub fn max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self {
260 Server {
261 max_concurrent_streams: max.into(),
262 ..self
263 }
264 }
265
266 #[must_use]
280 pub fn max_connection_age(self, max_connection_age: Duration) -> Self {
281 Server {
282 max_connection_age: Some(max_connection_age),
283 ..self
284 }
285 }
286
287 #[must_use]
297 pub fn http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self {
298 Server {
299 http2_keepalive_interval,
300 ..self
301 }
302 }
303
304 #[must_use]
312 pub fn http2_keepalive_timeout(mut self, http2_keepalive_timeout: Option<Duration>) -> Self {
313 if let Some(timeout) = http2_keepalive_timeout {
314 self.http2_keepalive_timeout = timeout;
315 }
316 self
317 }
318
319 #[must_use]
323 pub fn http2_adaptive_window(self, enabled: Option<bool>) -> Self {
324 Server {
325 http2_adaptive_window: enabled,
326 ..self
327 }
328 }
329
330 #[must_use]
336 pub fn http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self {
337 Server {
338 http2_max_pending_accept_reset_streams: max,
339 ..self
340 }
341 }
342
343 #[must_use]
354 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
355 Server {
356 tcp_keepalive,
357 ..self
358 }
359 }
360
361 #[must_use]
363 pub fn tcp_nodelay(self, enabled: bool) -> Self {
364 Server {
365 tcp_nodelay: enabled,
366 ..self
367 }
368 }
369
370 #[must_use]
374 pub fn http2_max_header_list_size(self, max: impl Into<Option<u32>>) -> Self {
375 Server {
376 http2_max_header_list_size: max.into(),
377 ..self
378 }
379 }
380
381 #[must_use]
387 pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self {
388 Server {
389 max_frame_size: frame_size.into(),
390 ..self
391 }
392 }
393
394 #[must_use]
403 pub fn accept_http1(self, accept_http1: bool) -> Self {
404 Server {
405 accept_http1,
406 ..self
407 }
408 }
409
410 #[must_use]
412 pub fn trace_fn<F>(self, f: F) -> Self
413 where
414 F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,
415 {
416 Server {
417 trace_interceptor: Some(Arc::new(f)),
418 ..self
419 }
420 }
421
422 #[cfg(feature = "router")]
427 pub fn add_service<S>(&mut self, svc: S) -> Router<L>
428 where
429 S: Service<Request<Body>, Error = Infallible>
430 + NamedService
431 + Clone
432 + Send
433 + Sync
434 + 'static,
435 S::Response: axum::response::IntoResponse,
436 S::Future: Send + 'static,
437 L: Clone,
438 {
439 Router::new(self.clone(), Routes::new(svc))
440 }
441
442 #[cfg(feature = "router")]
451 pub fn add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L>
452 where
453 S: Service<Request<Body>, Error = Infallible>
454 + NamedService
455 + Clone
456 + Send
457 + Sync
458 + 'static,
459 S::Response: axum::response::IntoResponse,
460 S::Future: Send + 'static,
461 L: Clone,
462 {
463 let routes = svc.map(Routes::new).unwrap_or_default();
464 Router::new(self.clone(), routes)
465 }
466
467 #[cfg(feature = "router")]
472 pub fn add_routes(&mut self, routes: Routes) -> Router<L>
473 where
474 L: Clone,
475 {
476 Router::new(self.clone(), routes)
477 }
478
479 pub fn layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>> {
541 Server {
542 service_builder: self.service_builder.layer(new_layer),
543 trace_interceptor: self.trace_interceptor,
544 concurrency_limit: self.concurrency_limit,
545 load_shed: self.load_shed,
546 timeout: self.timeout,
547 #[cfg(feature = "_tls-any")]
548 tls: self.tls,
549 init_stream_window_size: self.init_stream_window_size,
550 init_connection_window_size: self.init_connection_window_size,
551 max_concurrent_streams: self.max_concurrent_streams,
552 tcp_keepalive: self.tcp_keepalive,
553 tcp_nodelay: self.tcp_nodelay,
554 http2_keepalive_interval: self.http2_keepalive_interval,
555 http2_keepalive_timeout: self.http2_keepalive_timeout,
556 http2_adaptive_window: self.http2_adaptive_window,
557 http2_max_pending_accept_reset_streams: self.http2_max_pending_accept_reset_streams,
558 http2_max_header_list_size: self.http2_max_header_list_size,
559 max_frame_size: self.max_frame_size,
560 accept_http1: self.accept_http1,
561 max_connection_age: self.max_connection_age,
562 }
563 }
564
565 fn bind_incoming(&self, addr: SocketAddr) -> Result<TcpIncoming, super::Error> {
566 Ok(TcpIncoming::bind(addr)
567 .map_err(super::Error::from_source)?
568 .with_nodelay(Some(self.tcp_nodelay))
569 .with_keepalive(self.tcp_keepalive))
570 }
571
572 pub async fn serve<S, ResBody>(self, addr: SocketAddr, svc: S) -> Result<(), super::Error>
574 where
575 L: Layer<S>,
576 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
577 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
578 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
579 Into<crate::BoxError> + Send + 'static,
580 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
581 ResBody::Error: Into<crate::BoxError>,
582 {
583 let incoming = self.bind_incoming(addr)?;
584 self.serve_with_incoming(svc, incoming).await
585 }
586
587 pub async fn serve_with_shutdown<S, F, ResBody>(
589 self,
590 addr: SocketAddr,
591 svc: S,
592 signal: F,
593 ) -> Result<(), super::Error>
594 where
595 L: Layer<S>,
596 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
597 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
598 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
599 Into<crate::BoxError> + Send + 'static,
600 F: Future<Output = ()>,
601 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
602 ResBody::Error: Into<crate::BoxError>,
603 {
604 let incoming = self.bind_incoming(addr)?;
605 self.serve_with_incoming_shutdown(svc, incoming, signal)
606 .await
607 }
608
609 pub async fn serve_with_incoming<S, I, IO, IE, ResBody>(
611 self,
612 svc: S,
613 incoming: I,
614 ) -> Result<(), super::Error>
615 where
616 L: Layer<S>,
617 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
618 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
619 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
620 Into<crate::BoxError> + Send + 'static,
621 I: Stream<Item = Result<IO, IE>>,
622 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
623 IE: Into<crate::BoxError>,
624 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
625 ResBody::Error: Into<crate::BoxError>,
626 {
627 self.serve_internal(svc, incoming, Option::<future::Ready<()>>::None)
628 .await
629 }
630
631 pub async fn serve_with_incoming_shutdown<S, I, F, IO, IE, ResBody>(
633 self,
634 svc: S,
635 incoming: I,
636 signal: F,
637 ) -> Result<(), super::Error>
638 where
639 L: Layer<S>,
640 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
641 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
642 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
643 Into<crate::BoxError> + Send + 'static,
644 I: Stream<Item = Result<IO, IE>>,
645 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
646 IE: Into<crate::BoxError>,
647 F: Future<Output = ()>,
648 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
649 ResBody::Error: Into<crate::BoxError>,
650 {
651 self.serve_internal(svc, incoming, Some(signal)).await
652 }
653
654 async fn serve_internal<S, I, F, IO, IE, ResBody>(
655 self,
656 svc: S,
657 incoming: I,
658 signal: Option<F>,
659 ) -> Result<(), super::Error>
660 where
661 L: Layer<S>,
662 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
663 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
664 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
665 Into<crate::BoxError> + Send + 'static,
666 I: Stream<Item = Result<IO, IE>>,
667 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
668 IE: Into<crate::BoxError>,
669 F: Future<Output = ()>,
670 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
671 ResBody::Error: Into<crate::BoxError>,
672 {
673 let trace_interceptor = self.trace_interceptor.clone();
674 let concurrency_limit = self.concurrency_limit;
675 let load_shed = self.load_shed;
676 let init_connection_window_size = self.init_connection_window_size;
677 let init_stream_window_size = self.init_stream_window_size;
678 let max_concurrent_streams = self.max_concurrent_streams;
679 let timeout = self.timeout;
680 let max_header_list_size = self.http2_max_header_list_size;
681 let max_frame_size = self.max_frame_size;
682 let http2_only = !self.accept_http1;
683
684 let http2_keepalive_interval = self.http2_keepalive_interval;
685 let http2_keepalive_timeout = self.http2_keepalive_timeout;
686 let http2_adaptive_window = self.http2_adaptive_window;
687 let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
688 let max_connection_age = self.max_connection_age;
689
690 let svc = self.service_builder.service(svc);
691
692 let incoming = io_stream::ServerIoStream::new(
693 incoming,
694 #[cfg(feature = "_tls-any")]
695 self.tls,
696 );
697 let mut svc = MakeSvc {
698 inner: svc,
699 concurrency_limit,
700 load_shed,
701 timeout,
702 trace_interceptor,
703 _io: PhantomData,
704 };
705
706 let server = {
707 let mut builder = ConnectionBuilder::new(TokioExecutor::new());
708
709 if http2_only {
710 builder = builder.http2_only();
711 }
712
713 builder
714 .http2()
715 .timer(TokioTimer::new())
716 .initial_connection_window_size(init_connection_window_size)
717 .initial_stream_window_size(init_stream_window_size)
718 .max_concurrent_streams(max_concurrent_streams)
719 .keep_alive_interval(http2_keepalive_interval)
720 .keep_alive_timeout(http2_keepalive_timeout)
721 .adaptive_window(http2_adaptive_window.unwrap_or_default())
722 .max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
723 .max_frame_size(max_frame_size);
724
725 if let Some(max_header_list_size) = max_header_list_size {
726 builder.http2().max_header_list_size(max_header_list_size);
727 }
728
729 builder
730 };
731
732 let (signal_tx, signal_rx) = tokio::sync::watch::channel(());
733 let signal_tx = Arc::new(signal_tx);
734
735 let graceful = signal.is_some();
736 let mut sig = pin!(Fuse { inner: signal });
737 let mut incoming = pin!(incoming);
738
739 loop {
740 tokio::select! {
741 _ = &mut sig => {
742 trace!("signal received, shutting down");
743 break;
744 },
745 io = incoming.next() => {
746 let io = match io {
747 Some(Ok(io)) => io,
748 Some(Err(e)) => {
749 trace!("error accepting connection: {}", DisplayErrorStack(&*e));
750 continue;
751 },
752 None => {
753 break
754 },
755 };
756
757 trace!("connection accepted");
758
759 let req_svc = svc
760 .call(&io)
761 .await
762 .map_err(super::Error::from_source)?;
763
764 let hyper_io = TokioIo::new(io);
765 let hyper_svc = TowerToHyperService::new(req_svc.map_request(|req: Request<Incoming>| req.map(Body::new)));
766
767 serve_connection(hyper_io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone()), max_connection_age);
768 }
769 }
770 }
771
772 if graceful {
773 let _ = signal_tx.send(());
774 drop(signal_rx);
775 trace!(
776 "waiting for {} connections to close",
777 signal_tx.receiver_count()
778 );
779
780 signal_tx.closed().await;
782 }
783
784 Ok(())
785 }
786}
787
788fn serve_connection<B, IO, S, E>(
791 hyper_io: IO,
792 hyper_svc: S,
793 builder: ConnectionBuilder<E>,
794 mut watcher: Option<tokio::sync::watch::Receiver<()>>,
795 max_connection_age: Option<Duration>,
796) where
797 B: http_body::Body + Send + 'static,
798 B::Data: Send,
799 B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
800 IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
801 S: HyperService<Request<Incoming>, Response = Response<B>> + Clone + Send + 'static,
802 S::Future: Send + 'static,
803 S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
804 E: HttpServerConnExec<S::Future, B> + Send + Sync + 'static,
805{
806 tokio::spawn(async move {
807 {
808 let mut sig = pin!(Fuse {
809 inner: watcher.as_mut().map(|w| w.changed()),
810 });
811
812 let mut conn = pin!(builder.serve_connection(hyper_io, hyper_svc));
813
814 let mut sleep = pin!(sleep_or_pending(max_connection_age));
815
816 loop {
817 tokio::select! {
818 rv = &mut conn => {
819 if let Err(err) = rv {
820 debug!("failed serving connection: {}", DisplayErrorStack(&*err));
821 }
822 break;
823 },
824 _ = &mut sleep => {
825 conn.as_mut().graceful_shutdown();
826 sleep.set(sleep_or_pending(None));
827 },
828 _ = &mut sig => {
829 conn.as_mut().graceful_shutdown();
830 }
831 }
832 }
833 }
834
835 drop(watcher);
836 trace!("connection closed");
837 });
838}
839
840async fn sleep_or_pending(wait_for: Option<Duration>) {
841 match wait_for {
842 Some(wait) => tokio::time::sleep(wait).await,
843 None => future::pending().await,
844 };
845}
846
847#[cfg(feature = "router")]
848impl<L> Router<L> {
849 pub(crate) fn new(server: Server<L>, routes: Routes) -> Self {
850 Self { server, routes }
851 }
852}
853
854#[cfg(feature = "router")]
855impl<L> Router<L> {
856 pub fn add_service<S>(mut self, svc: S) -> Self
858 where
859 S: Service<Request<Body>, Error = Infallible>
860 + NamedService
861 + Clone
862 + Send
863 + Sync
864 + 'static,
865 S::Response: axum::response::IntoResponse,
866 S::Future: Send + 'static,
867 {
868 self.routes = self.routes.add_service(svc);
869 self
870 }
871
872 pub fn add_optional_service<S>(mut self, svc: Option<S>) -> Self
878 where
879 S: Service<Request<Body>, Error = Infallible>
880 + NamedService
881 + Clone
882 + Send
883 + Sync
884 + 'static,
885 S::Response: axum::response::IntoResponse,
886 S::Future: Send + 'static,
887 {
888 if let Some(svc) = svc {
889 self.routes = self.routes.add_service(svc);
890 }
891 self
892 }
893
894 pub async fn serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error>
900 where
901 L: Layer<Routes> + Clone,
902 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
903 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
904 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
905 Into<crate::BoxError> + Send,
906 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
907 ResBody::Error: Into<crate::BoxError>,
908 {
909 self.server.serve(addr, self.routes.prepare()).await
910 }
911
912 pub async fn serve_with_shutdown<F: Future<Output = ()>, ResBody>(
919 self,
920 addr: SocketAddr,
921 signal: F,
922 ) -> Result<(), super::Error>
923 where
924 L: Layer<Routes>,
925 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
926 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
927 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
928 Into<crate::BoxError> + Send,
929 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
930 ResBody::Error: Into<crate::BoxError>,
931 {
932 self.server
933 .serve_with_shutdown(addr, self.routes.prepare(), signal)
934 .await
935 }
936
937 pub async fn serve_with_incoming<I, IO, IE, ResBody>(
944 self,
945 incoming: I,
946 ) -> Result<(), super::Error>
947 where
948 I: Stream<Item = Result<IO, IE>>,
949 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
950 IE: Into<crate::BoxError>,
951 L: Layer<Routes>,
952
953 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
954 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
955 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
956 Into<crate::BoxError> + Send,
957 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
958 ResBody::Error: Into<crate::BoxError>,
959 {
960 self.server
961 .serve_with_incoming(self.routes.prepare(), incoming)
962 .await
963 }
964
965 pub async fn serve_with_incoming_shutdown<I, IO, IE, F, ResBody>(
974 self,
975 incoming: I,
976 signal: F,
977 ) -> Result<(), super::Error>
978 where
979 I: Stream<Item = Result<IO, IE>>,
980 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
981 IE: Into<crate::BoxError>,
982 F: Future<Output = ()>,
983 L: Layer<Routes>,
984 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
985 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
986 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
987 Into<crate::BoxError> + Send,
988 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
989 ResBody::Error: Into<crate::BoxError>,
990 {
991 self.server
992 .serve_with_incoming_shutdown(self.routes.prepare(), incoming, signal)
993 .await
994 }
995}
996
997impl<L> fmt::Debug for Server<L> {
998 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
999 f.debug_struct("Builder").finish()
1000 }
1001}
1002
1003#[derive(Clone)]
1004struct Svc<S> {
1005 inner: S,
1006 trace_interceptor: Option<TraceInterceptor>,
1007}
1008
1009impl<S, ResBody> Service<Request<Body>> for Svc<S>
1010where
1011 S: Service<Request<Body>, Response = Response<ResBody>>,
1012 S::Error: Into<crate::BoxError>,
1013 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1014 ResBody::Error: Into<crate::BoxError>,
1015{
1016 type Response = Response<Body>;
1017 type Error = crate::BoxError;
1018 type Future = SvcFuture<S::Future>;
1019
1020 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1021 self.inner.poll_ready(cx).map_err(Into::into)
1022 }
1023
1024 fn call(&mut self, mut req: Request<Body>) -> Self::Future {
1025 let span = if let Some(trace_interceptor) = &self.trace_interceptor {
1026 let (parts, body) = req.into_parts();
1027 let bodyless_request = Request::from_parts(parts, ());
1028
1029 let span = trace_interceptor(&bodyless_request);
1030
1031 let (parts, _) = bodyless_request.into_parts();
1032 req = Request::from_parts(parts, body);
1033
1034 span
1035 } else {
1036 tracing::Span::none()
1037 };
1038
1039 SvcFuture {
1040 inner: self.inner.call(req),
1041 span,
1042 }
1043 }
1044}
1045
1046#[pin_project]
1047struct SvcFuture<F> {
1048 #[pin]
1049 inner: F,
1050 span: tracing::Span,
1051}
1052
1053impl<F, E, ResBody> Future for SvcFuture<F>
1054where
1055 F: Future<Output = Result<Response<ResBody>, E>>,
1056 E: Into<crate::BoxError>,
1057 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1058 ResBody::Error: Into<crate::BoxError>,
1059{
1060 type Output = Result<Response<Body>, crate::BoxError>;
1061
1062 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1063 let this = self.project();
1064 let _guard = this.span.enter();
1065
1066 let response: Response<ResBody> = ready!(this.inner.poll(cx)).map_err(Into::into)?;
1067 let response = response.map(|body| Body::new(body.map_err(Into::into)));
1068 Poll::Ready(Ok(response))
1069 }
1070}
1071
1072impl<S> fmt::Debug for Svc<S> {
1073 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1074 f.debug_struct("Svc").finish()
1075 }
1076}
1077
1078#[derive(Clone)]
1079struct MakeSvc<S, IO> {
1080 concurrency_limit: Option<usize>,
1081 load_shed: bool,
1082 timeout: Option<Duration>,
1083 inner: S,
1084 trace_interceptor: Option<TraceInterceptor>,
1085 _io: PhantomData<fn() -> IO>,
1086}
1087
1088impl<S, ResBody, IO> Service<&ServerIo<IO>> for MakeSvc<S, IO>
1089where
1090 IO: Connected + 'static,
1091 S: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
1092 S::Future: Send,
1093 S::Error: Into<crate::BoxError> + Send,
1094 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1095 ResBody::Error: Into<crate::BoxError>,
1096{
1097 type Response = BoxService;
1098 type Error = crate::BoxError;
1099 type Future = future::Ready<Result<Self::Response, Self::Error>>;
1100
1101 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1102 Ok(()).into()
1103 }
1104
1105 fn call(&mut self, io: &ServerIo<IO>) -> Self::Future {
1106 let conn_info = io.connect_info();
1107
1108 let svc = self.inner.clone();
1109 let concurrency_limit = self.concurrency_limit;
1110 let timeout = self.timeout;
1111 let trace_interceptor = self.trace_interceptor.clone();
1112
1113 let svc = ServiceBuilder::new()
1114 .layer(RecoverErrorLayer::new())
1115 .option_layer(self.load_shed.then_some(LoadShedLayer::new()))
1116 .option_layer(concurrency_limit.map(ConcurrencyLimitLayer::new))
1117 .layer_fn(|s| GrpcTimeout::new(s, timeout))
1118 .service(svc);
1119
1120 let svc = ServiceBuilder::new()
1121 .layer(BoxCloneService::layer())
1122 .layer(ConnectInfoLayer::new(conn_info.clone()))
1123 .service(Svc {
1124 inner: svc,
1125 trace_interceptor,
1126 });
1127
1128 future::ready(Ok(svc))
1129 }
1130}
1131
1132#[pin_project]
1136struct Fuse<F> {
1137 #[pin]
1138 inner: Option<F>,
1139}
1140
1141impl<F> Future for Fuse<F>
1142where
1143 F: Future,
1144{
1145 type Output = F::Output;
1146
1147 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1148 match self.as_mut().project().inner.as_pin_mut() {
1149 Some(fut) => fut.poll(cx).map(|output| {
1150 self.project().inner.set(None);
1151 output
1152 }),
1153 None => Poll::Pending,
1154 }
1155 }
1156}