tonic/transport/channel/
endpoint.rs

1#[cfg(feature = "_tls-any")]
2use super::service::TlsConnector;
3use super::service::{self, Executor, SharedExec};
4use super::uds_connector::UdsConnector;
5use super::Channel;
6#[cfg(feature = "_tls-any")]
7use super::ClientTlsConfig;
8#[cfg(feature = "_tls-any")]
9use crate::transport::error;
10use crate::transport::Error;
11use bytes::Bytes;
12use http::{uri::Uri, HeaderValue};
13use hyper::rt;
14use hyper_util::client::legacy::connect::HttpConnector;
15use std::{fmt, future::Future, net::IpAddr, pin::Pin, str, str::FromStr, time::Duration};
16use tower_service::Service;
17
18#[derive(Clone, PartialEq, Eq, Hash)]
19pub(crate) enum EndpointType {
20    Uri(Uri),
21    Uds(String),
22}
23
24/// Channel builder.
25///
26/// This struct is used to build and configure HTTP/2 channels.
27#[derive(Clone)]
28pub struct Endpoint {
29    pub(crate) uri: EndpointType,
30    fallback_uri: Uri,
31    pub(crate) origin: Option<Uri>,
32    pub(crate) user_agent: Option<HeaderValue>,
33    pub(crate) timeout: Option<Duration>,
34    pub(crate) concurrency_limit: Option<usize>,
35    pub(crate) rate_limit: Option<(u64, Duration)>,
36    #[cfg(feature = "_tls-any")]
37    pub(crate) tls: Option<TlsConnector>,
38    pub(crate) buffer_size: Option<usize>,
39    pub(crate) init_stream_window_size: Option<u32>,
40    pub(crate) init_connection_window_size: Option<u32>,
41    pub(crate) tcp_keepalive: Option<Duration>,
42    pub(crate) tcp_keepalive_interval: Option<Duration>,
43    pub(crate) tcp_keepalive_retries: Option<u32>,
44    pub(crate) tcp_nodelay: bool,
45    pub(crate) http2_keep_alive_interval: Option<Duration>,
46    pub(crate) http2_keep_alive_timeout: Option<Duration>,
47    pub(crate) http2_keep_alive_while_idle: Option<bool>,
48    pub(crate) http2_max_header_list_size: Option<u32>,
49    pub(crate) connect_timeout: Option<Duration>,
50    pub(crate) http2_adaptive_window: Option<bool>,
51    pub(crate) local_address: Option<IpAddr>,
52    pub(crate) executor: SharedExec,
53}
54
55impl Endpoint {
56    // FIXME: determine if we want to expose this or not. This is really
57    // just used in codegen for a shortcut.
58    #[doc(hidden)]
59    pub fn new<D>(dst: D) -> Result<Self, Error>
60    where
61        D: TryInto<Self>,
62        D::Error: Into<crate::BoxError>,
63    {
64        let me = dst.try_into().map_err(|e| Error::from_source(e.into()))?;
65        #[cfg(feature = "_tls-any")]
66        if let EndpointType::Uri(uri) = &me.uri {
67            if me.tls.is_none() && uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
68                return me.tls_config(ClientTlsConfig::new().with_enabled_roots());
69            }
70        }
71        Ok(me)
72    }
73
74    fn new_uri(uri: Uri) -> Self {
75        Self {
76            uri: EndpointType::Uri(uri.clone()),
77            fallback_uri: uri,
78            origin: None,
79            user_agent: None,
80            concurrency_limit: None,
81            rate_limit: None,
82            timeout: None,
83            #[cfg(feature = "_tls-any")]
84            tls: None,
85            buffer_size: None,
86            init_stream_window_size: None,
87            init_connection_window_size: None,
88            tcp_keepalive: None,
89            tcp_keepalive_interval: None,
90            tcp_keepalive_retries: None,
91            tcp_nodelay: true,
92            http2_keep_alive_interval: None,
93            http2_keep_alive_timeout: None,
94            http2_keep_alive_while_idle: None,
95            http2_max_header_list_size: None,
96            connect_timeout: None,
97            http2_adaptive_window: None,
98            executor: SharedExec::tokio(),
99            local_address: None,
100        }
101    }
102
103    fn new_uds(uds_filepath: &str) -> Self {
104        Self {
105            uri: EndpointType::Uds(uds_filepath.to_string()),
106            fallback_uri: Uri::from_static("http://tonic"),
107            origin: None,
108            user_agent: None,
109            concurrency_limit: None,
110            rate_limit: None,
111            timeout: None,
112            #[cfg(feature = "_tls-any")]
113            tls: None,
114            buffer_size: None,
115            init_stream_window_size: None,
116            init_connection_window_size: None,
117            tcp_keepalive: None,
118            tcp_keepalive_interval: None,
119            tcp_keepalive_retries: None,
120            tcp_nodelay: true,
121            http2_keep_alive_interval: None,
122            http2_keep_alive_timeout: None,
123            http2_keep_alive_while_idle: None,
124            http2_max_header_list_size: None,
125            connect_timeout: None,
126            http2_adaptive_window: None,
127            executor: SharedExec::tokio(),
128            local_address: None,
129        }
130    }
131
132    /// Convert an `Endpoint` from a static string.
133    ///
134    /// # Panics
135    ///
136    /// This function panics if the argument is an invalid URI.
137    ///
138    /// ```
139    /// # use tonic::transport::Endpoint;
140    /// Endpoint::from_static("https://example.com");
141    /// ```
142    pub fn from_static(s: &'static str) -> Self {
143        if s.starts_with("unix:") {
144            let uds_filepath = s
145                .strip_prefix("unix://")
146                .or_else(|| s.strip_prefix("unix:"))
147                .expect("Invalid unix domain socket URI");
148            Self::new_uds(uds_filepath)
149        } else {
150            let uri = Uri::from_static(s);
151            Self::new_uri(uri)
152        }
153    }
154
155    /// Convert an `Endpoint` from shared bytes.
156    ///
157    /// ```
158    /// # use tonic::transport::Endpoint;
159    /// Endpoint::from_shared("https://example.com".to_string());
160    /// ```
161    pub fn from_shared(s: impl Into<Bytes>) -> Result<Self, Error> {
162        let s = str::from_utf8(&s.into())
163            .map_err(|e| Error::new_invalid_uri().with(e))?
164            .to_string();
165        if s.starts_with("unix:") {
166            let uds_filepath = s
167                .strip_prefix("unix://")
168                .or_else(|| s.strip_prefix("unix:"))
169                .ok_or(Error::new_invalid_uri())?;
170            Ok(Self::new_uds(uds_filepath))
171        } else {
172            let uri = Uri::from_maybe_shared(s).map_err(|e| Error::new_invalid_uri().with(e))?;
173            Ok(Self::from(uri))
174        }
175    }
176
177    /// Set a custom user-agent header.
178    ///
179    /// `user_agent` will be prepended to Tonic's default user-agent string (`tonic/x.x.x`).
180    /// It must be a value that can be converted into a valid  `http::HeaderValue` or building
181    /// the endpoint will fail.
182    /// ```
183    /// # use tonic::transport::Endpoint;
184    /// # let mut builder = Endpoint::from_static("https://example.com");
185    /// builder.user_agent("Greeter").expect("Greeter should be a valid header value");
186    /// // user-agent: "Greeter tonic/x.x.x"
187    /// ```
188    pub fn user_agent<T>(self, user_agent: T) -> Result<Self, Error>
189    where
190        T: TryInto<HeaderValue>,
191    {
192        user_agent
193            .try_into()
194            .map(|ua| Endpoint {
195                user_agent: Some(ua),
196                ..self
197            })
198            .map_err(|_| Error::new_invalid_user_agent())
199    }
200
201    /// Set a custom origin.
202    ///
203    /// Override the `origin`, mainly useful when you are reaching a Server/LoadBalancer
204    /// which serves multiple services at the same time.
205    /// It will play the role of SNI (Server Name Indication).
206    ///
207    /// ```
208    /// # use tonic::transport::Endpoint;
209    /// # let mut builder = Endpoint::from_static("https://proxy.com");
210    /// builder.origin("https://example.com".parse().expect("http://example.com must be a valid URI"));
211    /// // origin: "https://example.com"
212    /// ```
213    pub fn origin(self, origin: Uri) -> Self {
214        Endpoint {
215            origin: Some(origin),
216            ..self
217        }
218    }
219
220    /// Apply a timeout to each request.
221    ///
222    /// ```
223    /// # use tonic::transport::Endpoint;
224    /// # use std::time::Duration;
225    /// # let mut builder = Endpoint::from_static("https://example.com");
226    /// builder.timeout(Duration::from_secs(5));
227    /// ```
228    ///
229    /// # Notes
230    ///
231    /// This does **not** set the timeout metadata (`grpc-timeout` header) on
232    /// the request, meaning the server will not be informed of this timeout,
233    /// for that use [`Request::set_timeout`].
234    ///
235    /// [`Request::set_timeout`]: crate::Request::set_timeout
236    pub fn timeout(self, dur: Duration) -> Self {
237        Endpoint {
238            timeout: Some(dur),
239            ..self
240        }
241    }
242
243    /// Apply a timeout to connecting to the uri.
244    ///
245    /// Defaults to no timeout.
246    ///
247    /// ```
248    /// # use tonic::transport::Endpoint;
249    /// # use std::time::Duration;
250    /// # let mut builder = Endpoint::from_static("https://example.com");
251    /// builder.connect_timeout(Duration::from_secs(5));
252    /// ```
253    pub fn connect_timeout(self, dur: Duration) -> Self {
254        Endpoint {
255            connect_timeout: Some(dur),
256            ..self
257        }
258    }
259
260    /// Set whether TCP keepalive messages are enabled on accepted connections.
261    ///
262    /// If `None` is specified, keepalive is disabled, otherwise the duration
263    /// specified will be the time to remain idle before sending TCP keepalive
264    /// probes.
265    ///
266    /// Default is no keepalive (`None`)
267    pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
268        Endpoint {
269            tcp_keepalive,
270            ..self
271        }
272    }
273
274    /// Set the duration between two successive TCP keepalive retransmissions,
275    /// if acknowledgement to the previous keepalive transmission is not received.
276    ///
277    /// This is only used if `tcp_keepalive` is not None.
278    ///
279    /// Defaults to None, which is the system default.
280    pub fn tcp_keepalive_interval(self, tcp_keepalive_interval: Option<Duration>) -> Self {
281        Endpoint {
282            tcp_keepalive_interval,
283            ..self
284        }
285    }
286
287    /// Set the number of retransmissions to be carried out before declaring that remote end is not available.
288    ///
289    /// This is only used if `tcp_keepalive` is not None.
290    ///
291    /// Defaults to None, which is the system default.
292    pub fn tcp_keepalive_retries(self, tcp_keepalive_retries: Option<u32>) -> Self {
293        Endpoint {
294            tcp_keepalive_retries,
295            ..self
296        }
297    }
298
299    /// Apply a concurrency limit to each request.
300    ///
301    /// ```
302    /// # use tonic::transport::Endpoint;
303    /// # let mut builder = Endpoint::from_static("https://example.com");
304    /// builder.concurrency_limit(256);
305    /// ```
306    pub fn concurrency_limit(self, limit: usize) -> Self {
307        Endpoint {
308            concurrency_limit: Some(limit),
309            ..self
310        }
311    }
312
313    /// Apply a rate limit to each request.
314    ///
315    /// ```
316    /// # use tonic::transport::Endpoint;
317    /// # use std::time::Duration;
318    /// # let mut builder = Endpoint::from_static("https://example.com");
319    /// builder.rate_limit(32, Duration::from_secs(1));
320    /// ```
321    pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
322        Endpoint {
323            rate_limit: Some((limit, duration)),
324            ..self
325        }
326    }
327
328    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
329    /// stream-level flow control.
330    ///
331    /// Default is 65,535
332    ///
333    /// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize
334    pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
335        Endpoint {
336            init_stream_window_size: sz.into(),
337            ..self
338        }
339    }
340
341    /// Sets the max connection-level flow control for HTTP2
342    ///
343    /// Default is 65,535
344    pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
345        Endpoint {
346            init_connection_window_size: sz.into(),
347            ..self
348        }
349    }
350
351    /// Sets the tower service default internal buffer size
352    ///
353    /// Default is 1024
354    pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
355        Endpoint {
356            buffer_size: sz.into(),
357            ..self
358        }
359    }
360
361    /// Configures TLS for the endpoint.
362    #[cfg(feature = "_tls-any")]
363    pub fn tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error> {
364        match &self.uri {
365            EndpointType::Uri(uri) => Ok(Endpoint {
366                tls: Some(
367                    tls_config
368                        .into_tls_connector(uri)
369                        .map_err(Error::from_source)?,
370                ),
371                ..self
372            }),
373            EndpointType::Uds(_) => Err(Error::new(error::Kind::InvalidTlsConfigForUds)),
374        }
375    }
376
377    /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
378    pub fn tcp_nodelay(self, enabled: bool) -> Self {
379        Endpoint {
380            tcp_nodelay: enabled,
381            ..self
382        }
383    }
384
385    /// Set http2 KEEP_ALIVE_INTERVAL. Uses `hyper`'s default otherwise.
386    pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
387        Endpoint {
388            http2_keep_alive_interval: Some(interval),
389            ..self
390        }
391    }
392
393    /// Set http2 KEEP_ALIVE_TIMEOUT. Uses `hyper`'s default otherwise.
394    pub fn keep_alive_timeout(self, duration: Duration) -> Self {
395        Endpoint {
396            http2_keep_alive_timeout: Some(duration),
397            ..self
398        }
399    }
400
401    /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses `hyper`'s default otherwise.
402    pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
403        Endpoint {
404            http2_keep_alive_while_idle: Some(enabled),
405            ..self
406        }
407    }
408
409    /// Sets whether to use an adaptive flow control. Uses `hyper`'s default otherwise.
410    pub fn http2_adaptive_window(self, enabled: bool) -> Self {
411        Endpoint {
412            http2_adaptive_window: Some(enabled),
413            ..self
414        }
415    }
416
417    /// Sets the max size of received header frames.
418    ///
419    /// This will default to whatever the default in hyper is. As of v1.4.1, it is 16 KiB.
420    pub fn http2_max_header_list_size(self, size: u32) -> Self {
421        Endpoint {
422            http2_max_header_list_size: Some(size),
423            ..self
424        }
425    }
426
427    /// Sets the executor used to spawn async tasks.
428    ///
429    /// Uses `tokio::spawn` by default.
430    pub fn executor<E>(mut self, executor: E) -> Self
431    where
432        E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
433    {
434        self.executor = SharedExec::new(executor);
435        self
436    }
437
438    pub(crate) fn connector<C>(&self, c: C) -> service::Connector<C> {
439        service::Connector::new(
440            c,
441            #[cfg(feature = "_tls-any")]
442            self.tls.clone(),
443        )
444    }
445
446    /// Set the local address.
447    ///
448    /// This sets the IP address the client will use. By default we let hyper select the IP address.
449    pub fn local_address(self, addr: Option<IpAddr>) -> Self {
450        Endpoint {
451            local_address: addr,
452            ..self
453        }
454    }
455
456    pub(crate) fn http_connector(&self) -> service::Connector<HttpConnector> {
457        let mut http = HttpConnector::new();
458        http.enforce_http(false);
459        http.set_nodelay(self.tcp_nodelay);
460        http.set_keepalive(self.tcp_keepalive);
461        http.set_keepalive_interval(self.tcp_keepalive_interval);
462        http.set_keepalive_retries(self.tcp_keepalive_retries);
463        http.set_connect_timeout(self.connect_timeout);
464        http.set_local_address(self.local_address);
465        self.connector(http)
466    }
467
468    pub(crate) fn uds_connector(&self, uds_filepath: &str) -> service::Connector<UdsConnector> {
469        self.connector(UdsConnector::new(uds_filepath))
470    }
471
472    /// Create a channel from this config.
473    pub async fn connect(&self) -> Result<Channel, Error> {
474        match &self.uri {
475            EndpointType::Uri(_) => Channel::connect(self.http_connector(), self.clone()).await,
476            EndpointType::Uds(uds_filepath) => {
477                Channel::connect(self.uds_connector(uds_filepath.as_str()), self.clone()).await
478            }
479        }
480    }
481
482    /// Create a channel from this config.
483    ///
484    /// The channel returned by this method does not attempt to connect to the endpoint until first
485    /// use.
486    pub fn connect_lazy(&self) -> Channel {
487        match &self.uri {
488            EndpointType::Uri(_) => Channel::new(self.http_connector(), self.clone()),
489            EndpointType::Uds(uds_filepath) => {
490                Channel::new(self.uds_connector(uds_filepath.as_str()), self.clone())
491            }
492        }
493    }
494
495    /// Connect with a custom connector.
496    ///
497    /// This allows you to build a [Channel](struct.Channel.html) that uses a non-HTTP transport.
498    /// See the `uds` example for an example on how to use this function to build channel that
499    /// uses a Unix socket transport.
500    ///
501    /// The [`connect_timeout`](Endpoint::connect_timeout) will still be applied.
502    pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
503    where
504        C: Service<Uri> + Send + 'static,
505        C::Response: rt::Read + rt::Write + Send + Unpin,
506        C::Future: Send,
507        crate::BoxError: From<C::Error> + Send,
508    {
509        let connector = self.connector(connector);
510
511        if let Some(connect_timeout) = self.connect_timeout {
512            let mut connector = hyper_timeout::TimeoutConnector::new(connector);
513            connector.set_connect_timeout(Some(connect_timeout));
514            Channel::connect(connector, self.clone()).await
515        } else {
516            Channel::connect(connector, self.clone()).await
517        }
518    }
519
520    /// Connect with a custom connector lazily.
521    ///
522    /// This allows you to build a [Channel](struct.Channel.html) that uses a non-HTTP transport
523    /// connect to it lazily.
524    ///
525    /// See the `uds` example for an example on how to use this function to build channel that
526    /// uses a Unix socket transport.
527    pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Channel
528    where
529        C: Service<Uri> + Send + 'static,
530        C::Response: rt::Read + rt::Write + Send + Unpin,
531        C::Future: Send,
532        crate::BoxError: From<C::Error> + Send,
533    {
534        let connector = self.connector(connector);
535        if let Some(connect_timeout) = self.connect_timeout {
536            let mut connector = hyper_timeout::TimeoutConnector::new(connector);
537            connector.set_connect_timeout(Some(connect_timeout));
538            Channel::new(connector, self.clone())
539        } else {
540            Channel::new(connector, self.clone())
541        }
542    }
543
544    /// Get the endpoint uri.
545    ///
546    /// ```
547    /// # use tonic::transport::Endpoint;
548    /// # use http::Uri;
549    /// let endpoint = Endpoint::from_static("https://example.com");
550    ///
551    /// assert_eq!(endpoint.uri(), &Uri::from_static("https://example.com"));
552    /// ```
553    pub fn uri(&self) -> &Uri {
554        match &self.uri {
555            EndpointType::Uri(uri) => uri,
556            EndpointType::Uds(_) => &self.fallback_uri,
557        }
558    }
559
560    /// Get the value of `TCP_NODELAY` option for accepted connections.
561    pub fn get_tcp_nodelay(&self) -> bool {
562        self.tcp_nodelay
563    }
564
565    /// Get the connect timeout.
566    pub fn get_connect_timeout(&self) -> Option<Duration> {
567        self.connect_timeout
568    }
569
570    /// Get whether TCP keepalive messages are enabled on accepted connections.
571    ///
572    /// If `None` is specified, keepalive is disabled, otherwise the duration
573    /// specified will be the time to remain idle before sending TCP keepalive
574    /// probes.
575    pub fn get_tcp_keepalive(&self) -> Option<Duration> {
576        self.tcp_keepalive
577    }
578
579    /// Get whether TCP keepalive interval.
580    pub fn get_tcp_keepalive_interval(&self) -> Option<Duration> {
581        self.tcp_keepalive_interval
582    }
583
584    /// Get whether TCP keepalive retries.
585    pub fn get_tcp_keepalive_retries(&self) -> Option<u32> {
586        self.tcp_keepalive_retries
587    }
588}
589
590impl From<Uri> for Endpoint {
591    fn from(uri: Uri) -> Self {
592        Self::new_uri(uri)
593    }
594}
595
596impl TryFrom<Bytes> for Endpoint {
597    type Error = Error;
598
599    fn try_from(t: Bytes) -> Result<Self, Self::Error> {
600        Self::from_shared(t)
601    }
602}
603
604impl TryFrom<String> for Endpoint {
605    type Error = Error;
606
607    fn try_from(t: String) -> Result<Self, Self::Error> {
608        Self::from_shared(t.into_bytes())
609    }
610}
611
612impl TryFrom<&'static str> for Endpoint {
613    type Error = Error;
614
615    fn try_from(t: &'static str) -> Result<Self, Self::Error> {
616        Self::from_shared(t.as_bytes())
617    }
618}
619
620impl fmt::Debug for Endpoint {
621    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
622        f.debug_struct("Endpoint").finish()
623    }
624}
625
626impl FromStr for Endpoint {
627    type Err = Error;
628
629    fn from_str(s: &str) -> Result<Self, Self::Err> {
630        Self::try_from(s.to_string())
631    }
632}