1#[cfg(feature = "_tls-any")]
2use super::service::TlsConnector;
3use super::service::{self, Executor, SharedExec};
4use super::uds_connector::UdsConnector;
5use super::Channel;
6#[cfg(feature = "_tls-any")]
7use super::ClientTlsConfig;
8#[cfg(feature = "_tls-any")]
9use crate::transport::error;
10use crate::transport::Error;
11use bytes::Bytes;
12use http::{uri::Uri, HeaderValue};
13use hyper::rt;
14use hyper_util::client::legacy::connect::HttpConnector;
15use std::{fmt, future::Future, net::IpAddr, pin::Pin, str, str::FromStr, time::Duration};
16use tower_service::Service;
17
18#[derive(Clone, PartialEq, Eq, Hash)]
19pub(crate) enum EndpointType {
20 Uri(Uri),
21 Uds(String),
22}
23
24#[derive(Clone)]
28pub struct Endpoint {
29 pub(crate) uri: EndpointType,
30 fallback_uri: Uri,
31 pub(crate) origin: Option<Uri>,
32 pub(crate) user_agent: Option<HeaderValue>,
33 pub(crate) timeout: Option<Duration>,
34 pub(crate) concurrency_limit: Option<usize>,
35 pub(crate) rate_limit: Option<(u64, Duration)>,
36 #[cfg(feature = "_tls-any")]
37 pub(crate) tls: Option<TlsConnector>,
38 pub(crate) buffer_size: Option<usize>,
39 pub(crate) init_stream_window_size: Option<u32>,
40 pub(crate) init_connection_window_size: Option<u32>,
41 pub(crate) tcp_keepalive: Option<Duration>,
42 pub(crate) tcp_keepalive_interval: Option<Duration>,
43 pub(crate) tcp_keepalive_retries: Option<u32>,
44 pub(crate) tcp_nodelay: bool,
45 pub(crate) http2_keep_alive_interval: Option<Duration>,
46 pub(crate) http2_keep_alive_timeout: Option<Duration>,
47 pub(crate) http2_keep_alive_while_idle: Option<bool>,
48 pub(crate) http2_max_header_list_size: Option<u32>,
49 pub(crate) connect_timeout: Option<Duration>,
50 pub(crate) http2_adaptive_window: Option<bool>,
51 pub(crate) local_address: Option<IpAddr>,
52 pub(crate) executor: SharedExec,
53}
54
55impl Endpoint {
56 #[doc(hidden)]
59 pub fn new<D>(dst: D) -> Result<Self, Error>
60 where
61 D: TryInto<Self>,
62 D::Error: Into<crate::BoxError>,
63 {
64 let me = dst.try_into().map_err(|e| Error::from_source(e.into()))?;
65 #[cfg(feature = "_tls-any")]
66 if let EndpointType::Uri(uri) = &me.uri {
67 if me.tls.is_none() && uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
68 return me.tls_config(ClientTlsConfig::new().with_enabled_roots());
69 }
70 }
71 Ok(me)
72 }
73
74 fn new_uri(uri: Uri) -> Self {
75 Self {
76 uri: EndpointType::Uri(uri.clone()),
77 fallback_uri: uri,
78 origin: None,
79 user_agent: None,
80 concurrency_limit: None,
81 rate_limit: None,
82 timeout: None,
83 #[cfg(feature = "_tls-any")]
84 tls: None,
85 buffer_size: None,
86 init_stream_window_size: None,
87 init_connection_window_size: None,
88 tcp_keepalive: None,
89 tcp_keepalive_interval: None,
90 tcp_keepalive_retries: None,
91 tcp_nodelay: true,
92 http2_keep_alive_interval: None,
93 http2_keep_alive_timeout: None,
94 http2_keep_alive_while_idle: None,
95 http2_max_header_list_size: None,
96 connect_timeout: None,
97 http2_adaptive_window: None,
98 executor: SharedExec::tokio(),
99 local_address: None,
100 }
101 }
102
103 fn new_uds(uds_filepath: &str) -> Self {
104 Self {
105 uri: EndpointType::Uds(uds_filepath.to_string()),
106 fallback_uri: Uri::from_static("http://tonic"),
107 origin: None,
108 user_agent: None,
109 concurrency_limit: None,
110 rate_limit: None,
111 timeout: None,
112 #[cfg(feature = "_tls-any")]
113 tls: None,
114 buffer_size: None,
115 init_stream_window_size: None,
116 init_connection_window_size: None,
117 tcp_keepalive: None,
118 tcp_keepalive_interval: None,
119 tcp_keepalive_retries: None,
120 tcp_nodelay: true,
121 http2_keep_alive_interval: None,
122 http2_keep_alive_timeout: None,
123 http2_keep_alive_while_idle: None,
124 http2_max_header_list_size: None,
125 connect_timeout: None,
126 http2_adaptive_window: None,
127 executor: SharedExec::tokio(),
128 local_address: None,
129 }
130 }
131
132 pub fn from_static(s: &'static str) -> Self {
143 if s.starts_with("unix:") {
144 let uds_filepath = s
145 .strip_prefix("unix://")
146 .or_else(|| s.strip_prefix("unix:"))
147 .expect("Invalid unix domain socket URI");
148 Self::new_uds(uds_filepath)
149 } else {
150 let uri = Uri::from_static(s);
151 Self::new_uri(uri)
152 }
153 }
154
155 pub fn from_shared(s: impl Into<Bytes>) -> Result<Self, Error> {
162 let s = str::from_utf8(&s.into())
163 .map_err(|e| Error::new_invalid_uri().with(e))?
164 .to_string();
165 if s.starts_with("unix:") {
166 let uds_filepath = s
167 .strip_prefix("unix://")
168 .or_else(|| s.strip_prefix("unix:"))
169 .ok_or(Error::new_invalid_uri())?;
170 Ok(Self::new_uds(uds_filepath))
171 } else {
172 let uri = Uri::from_maybe_shared(s).map_err(|e| Error::new_invalid_uri().with(e))?;
173 Ok(Self::from(uri))
174 }
175 }
176
177 pub fn user_agent<T>(self, user_agent: T) -> Result<Self, Error>
189 where
190 T: TryInto<HeaderValue>,
191 {
192 user_agent
193 .try_into()
194 .map(|ua| Endpoint {
195 user_agent: Some(ua),
196 ..self
197 })
198 .map_err(|_| Error::new_invalid_user_agent())
199 }
200
201 pub fn origin(self, origin: Uri) -> Self {
214 Endpoint {
215 origin: Some(origin),
216 ..self
217 }
218 }
219
220 pub fn timeout(self, dur: Duration) -> Self {
237 Endpoint {
238 timeout: Some(dur),
239 ..self
240 }
241 }
242
243 pub fn connect_timeout(self, dur: Duration) -> Self {
254 Endpoint {
255 connect_timeout: Some(dur),
256 ..self
257 }
258 }
259
260 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
268 Endpoint {
269 tcp_keepalive,
270 ..self
271 }
272 }
273
274 pub fn tcp_keepalive_interval(self, tcp_keepalive_interval: Option<Duration>) -> Self {
281 Endpoint {
282 tcp_keepalive_interval,
283 ..self
284 }
285 }
286
287 pub fn tcp_keepalive_retries(self, tcp_keepalive_retries: Option<u32>) -> Self {
293 Endpoint {
294 tcp_keepalive_retries,
295 ..self
296 }
297 }
298
299 pub fn concurrency_limit(self, limit: usize) -> Self {
307 Endpoint {
308 concurrency_limit: Some(limit),
309 ..self
310 }
311 }
312
313 pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
322 Endpoint {
323 rate_limit: Some((limit, duration)),
324 ..self
325 }
326 }
327
328 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
335 Endpoint {
336 init_stream_window_size: sz.into(),
337 ..self
338 }
339 }
340
341 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
345 Endpoint {
346 init_connection_window_size: sz.into(),
347 ..self
348 }
349 }
350
351 pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
355 Endpoint {
356 buffer_size: sz.into(),
357 ..self
358 }
359 }
360
361 #[cfg(feature = "_tls-any")]
363 pub fn tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error> {
364 match &self.uri {
365 EndpointType::Uri(uri) => Ok(Endpoint {
366 tls: Some(
367 tls_config
368 .into_tls_connector(uri)
369 .map_err(Error::from_source)?,
370 ),
371 ..self
372 }),
373 EndpointType::Uds(_) => Err(Error::new(error::Kind::InvalidTlsConfigForUds)),
374 }
375 }
376
377 pub fn tcp_nodelay(self, enabled: bool) -> Self {
379 Endpoint {
380 tcp_nodelay: enabled,
381 ..self
382 }
383 }
384
385 pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
387 Endpoint {
388 http2_keep_alive_interval: Some(interval),
389 ..self
390 }
391 }
392
393 pub fn keep_alive_timeout(self, duration: Duration) -> Self {
395 Endpoint {
396 http2_keep_alive_timeout: Some(duration),
397 ..self
398 }
399 }
400
401 pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
403 Endpoint {
404 http2_keep_alive_while_idle: Some(enabled),
405 ..self
406 }
407 }
408
409 pub fn http2_adaptive_window(self, enabled: bool) -> Self {
411 Endpoint {
412 http2_adaptive_window: Some(enabled),
413 ..self
414 }
415 }
416
417 pub fn http2_max_header_list_size(self, size: u32) -> Self {
421 Endpoint {
422 http2_max_header_list_size: Some(size),
423 ..self
424 }
425 }
426
427 pub fn executor<E>(mut self, executor: E) -> Self
431 where
432 E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
433 {
434 self.executor = SharedExec::new(executor);
435 self
436 }
437
438 pub(crate) fn connector<C>(&self, c: C) -> service::Connector<C> {
439 service::Connector::new(
440 c,
441 #[cfg(feature = "_tls-any")]
442 self.tls.clone(),
443 )
444 }
445
446 pub fn local_address(self, addr: Option<IpAddr>) -> Self {
450 Endpoint {
451 local_address: addr,
452 ..self
453 }
454 }
455
456 pub(crate) fn http_connector(&self) -> service::Connector<HttpConnector> {
457 let mut http = HttpConnector::new();
458 http.enforce_http(false);
459 http.set_nodelay(self.tcp_nodelay);
460 http.set_keepalive(self.tcp_keepalive);
461 http.set_keepalive_interval(self.tcp_keepalive_interval);
462 http.set_keepalive_retries(self.tcp_keepalive_retries);
463 http.set_connect_timeout(self.connect_timeout);
464 http.set_local_address(self.local_address);
465 self.connector(http)
466 }
467
468 pub(crate) fn uds_connector(&self, uds_filepath: &str) -> service::Connector<UdsConnector> {
469 self.connector(UdsConnector::new(uds_filepath))
470 }
471
472 pub async fn connect(&self) -> Result<Channel, Error> {
474 match &self.uri {
475 EndpointType::Uri(_) => Channel::connect(self.http_connector(), self.clone()).await,
476 EndpointType::Uds(uds_filepath) => {
477 Channel::connect(self.uds_connector(uds_filepath.as_str()), self.clone()).await
478 }
479 }
480 }
481
482 pub fn connect_lazy(&self) -> Channel {
487 match &self.uri {
488 EndpointType::Uri(_) => Channel::new(self.http_connector(), self.clone()),
489 EndpointType::Uds(uds_filepath) => {
490 Channel::new(self.uds_connector(uds_filepath.as_str()), self.clone())
491 }
492 }
493 }
494
495 pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
503 where
504 C: Service<Uri> + Send + 'static,
505 C::Response: rt::Read + rt::Write + Send + Unpin,
506 C::Future: Send,
507 crate::BoxError: From<C::Error> + Send,
508 {
509 let connector = self.connector(connector);
510
511 if let Some(connect_timeout) = self.connect_timeout {
512 let mut connector = hyper_timeout::TimeoutConnector::new(connector);
513 connector.set_connect_timeout(Some(connect_timeout));
514 Channel::connect(connector, self.clone()).await
515 } else {
516 Channel::connect(connector, self.clone()).await
517 }
518 }
519
520 pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Channel
528 where
529 C: Service<Uri> + Send + 'static,
530 C::Response: rt::Read + rt::Write + Send + Unpin,
531 C::Future: Send,
532 crate::BoxError: From<C::Error> + Send,
533 {
534 let connector = self.connector(connector);
535 if let Some(connect_timeout) = self.connect_timeout {
536 let mut connector = hyper_timeout::TimeoutConnector::new(connector);
537 connector.set_connect_timeout(Some(connect_timeout));
538 Channel::new(connector, self.clone())
539 } else {
540 Channel::new(connector, self.clone())
541 }
542 }
543
544 pub fn uri(&self) -> &Uri {
554 match &self.uri {
555 EndpointType::Uri(uri) => uri,
556 EndpointType::Uds(_) => &self.fallback_uri,
557 }
558 }
559
560 pub fn get_tcp_nodelay(&self) -> bool {
562 self.tcp_nodelay
563 }
564
565 pub fn get_connect_timeout(&self) -> Option<Duration> {
567 self.connect_timeout
568 }
569
570 pub fn get_tcp_keepalive(&self) -> Option<Duration> {
576 self.tcp_keepalive
577 }
578
579 pub fn get_tcp_keepalive_interval(&self) -> Option<Duration> {
581 self.tcp_keepalive_interval
582 }
583
584 pub fn get_tcp_keepalive_retries(&self) -> Option<u32> {
586 self.tcp_keepalive_retries
587 }
588}
589
590impl From<Uri> for Endpoint {
591 fn from(uri: Uri) -> Self {
592 Self::new_uri(uri)
593 }
594}
595
596impl TryFrom<Bytes> for Endpoint {
597 type Error = Error;
598
599 fn try_from(t: Bytes) -> Result<Self, Self::Error> {
600 Self::from_shared(t)
601 }
602}
603
604impl TryFrom<String> for Endpoint {
605 type Error = Error;
606
607 fn try_from(t: String) -> Result<Self, Self::Error> {
608 Self::from_shared(t.into_bytes())
609 }
610}
611
612impl TryFrom<&'static str> for Endpoint {
613 type Error = Error;
614
615 fn try_from(t: &'static str) -> Result<Self, Self::Error> {
616 Self::from_shared(t.as_bytes())
617 }
618}
619
620impl fmt::Debug for Endpoint {
621 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
622 f.debug_struct("Endpoint").finish()
623 }
624}
625
626impl FromStr for Endpoint {
627 type Err = Error;
628
629 fn from_str(s: &str) -> Result<Self, Self::Err> {
630 Self::try_from(s.to_string())
631 }
632}