tonic/transport/channel/
mod.rs

1//! Client implementation and builder.
2
3mod endpoint;
4pub(crate) mod service;
5#[cfg(feature = "_tls-any")]
6mod tls;
7mod uds_connector;
8
9pub use self::service::Change;
10pub use endpoint::Endpoint;
11#[cfg(feature = "_tls-any")]
12pub use tls::ClientTlsConfig;
13
14use self::service::{Connection, DynamicServiceStream, Executor, SharedExec};
15use crate::body::Body;
16use bytes::Bytes;
17use http::{
18    uri::{InvalidUri, Uri},
19    Request, Response,
20};
21use std::{
22    fmt,
23    future::Future,
24    hash::Hash,
25    pin::Pin,
26    task::{Context, Poll},
27};
28use tokio::sync::mpsc::{channel, Sender};
29
30use hyper::rt;
31use tower::balance::p2c::Balance;
32use tower::{
33    buffer::{future::ResponseFuture as BufferResponseFuture, Buffer},
34    discover::Discover,
35    util::BoxService,
36    Service,
37};
38
39type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
40
41const DEFAULT_BUFFER_SIZE: usize = 1024;
42
43/// A default batteries included `transport` channel.
44///
45/// This provides a fully featured http2 gRPC client based on `hyper`
46/// and `tower` services.
47///
48/// # Multiplexing requests
49///
50/// Sending a request on a channel requires a `&mut self` and thus can only send
51/// one request in flight. This is intentional and is required to follow the `Service`
52/// contract from the `tower` library which this channel implementation is built on
53/// top of.
54///
55/// `tower` itself has a concept of `poll_ready` which is the main mechanism to apply
56/// back pressure. `poll_ready` takes a `&mut self` and when it returns `Poll::Ready`
57/// we know the `Service` is able to accept only one request before we must `poll_ready`
58/// again. Due to this fact any `async fn` that wants to poll for readiness and submit
59/// the request must have a `&mut self` reference.
60///
61/// To work around this and to ease the use of the channel, `Channel` provides a
62/// `Clone` implementation that is _cheap_. This is because at the very top level
63/// the channel is backed by a `tower_buffer::Buffer` which runs the connection
64/// in a background task and provides a `mpsc` channel interface. Due to this
65/// cloning the `Channel` type is cheap and encouraged.
66#[derive(Clone)]
67pub struct Channel {
68    svc: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, crate::BoxError>>>,
69}
70
71/// A future that resolves to an HTTP response.
72///
73/// This is returned by the `Service::call` on [`Channel`].
74pub struct ResponseFuture {
75    inner: BufferResponseFuture<BoxFuture<'static, Result<Response<Body>, crate::BoxError>>>,
76}
77
78impl Channel {
79    /// Create an [`Endpoint`] builder that can create [`Channel`]s.
80    pub fn builder(uri: Uri) -> Endpoint {
81        Endpoint::from(uri)
82    }
83
84    /// Create an [`Endpoint`] from a static string.
85    ///
86    /// ```
87    /// # use tonic::transport::Channel;
88    /// Channel::from_static("https://example.com");
89    /// ```
90    pub fn from_static(s: &'static str) -> Endpoint {
91        let uri = Uri::from_static(s);
92        Self::builder(uri)
93    }
94
95    /// Create an [`Endpoint`] from shared bytes.
96    ///
97    /// ```
98    /// # use tonic::transport::Channel;
99    /// Channel::from_shared("https://example.com");
100    /// ```
101    pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> {
102        let uri = Uri::from_maybe_shared(s.into())?;
103        Ok(Self::builder(uri))
104    }
105
106    /// Balance a list of [`Endpoint`]'s.
107    ///
108    /// This creates a [`Channel`] that will load balance across all the
109    /// provided endpoints.
110    pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
111        let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
112        list.for_each(|endpoint| {
113            tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
114                .unwrap();
115        });
116
117        channel
118    }
119
120    /// Balance a list of [`Endpoint`]'s.
121    ///
122    /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
123    pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
124    where
125        K: Hash + Eq + Send + Clone + 'static,
126    {
127        Self::balance_channel_with_executor(capacity, SharedExec::tokio())
128    }
129
130    /// Balance a list of [`Endpoint`]'s.
131    ///
132    /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
133    ///
134    /// The [`Channel`] will use the given executor to spawn async tasks.
135    pub fn balance_channel_with_executor<K, E>(
136        capacity: usize,
137        executor: E,
138    ) -> (Self, Sender<Change<K, Endpoint>>)
139    where
140        K: Hash + Eq + Send + Clone + 'static,
141        E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
142    {
143        let (tx, rx) = channel(capacity);
144        let list = DynamicServiceStream::new(rx);
145        (Self::balance(list, DEFAULT_BUFFER_SIZE, executor), tx)
146    }
147
148    /// Create a new [`Channel`] using a custom connector to the provided [Endpoint].
149    ///
150    /// This is a lower level API, prefer to use [`Endpoint::connect_lazy`] if you are not using a custom connector.
151    pub fn new<C>(connector: C, endpoint: Endpoint) -> Self
152    where
153        C: Service<Uri> + Send + 'static,
154        C::Error: Into<crate::BoxError> + Send,
155        C::Future: Send,
156        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
157    {
158        let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
159        let executor = endpoint.executor.clone();
160
161        let svc = Connection::lazy(connector, endpoint);
162        let (svc, worker) = Buffer::pair(svc, buffer_size);
163
164        executor.execute(worker);
165
166        Channel { svc }
167    }
168
169    /// Connect to the provided [`Endpoint`] using the provided connector, and return a new [`Channel`].
170    ///
171    /// This is a lower level API, prefer to use [`Endpoint::connect`] if you are not using a custom connector.
172    pub async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
173    where
174        C: Service<Uri> + Send + 'static,
175        C::Error: Into<crate::BoxError> + Send,
176        C::Future: Unpin + Send,
177        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
178    {
179        let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
180        let executor = endpoint.executor.clone();
181
182        let svc = Connection::connect(connector, endpoint)
183            .await
184            .map_err(super::Error::from_source)?;
185        let (svc, worker) = Buffer::pair(svc, buffer_size);
186        executor.execute(worker);
187
188        Ok(Channel { svc })
189    }
190
191    pub(crate) fn balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self
192    where
193        D: Discover<Service = Connection> + Unpin + Send + 'static,
194        D::Error: Into<crate::BoxError>,
195        D::Key: Hash + Send + Clone,
196        E: Executor<BoxFuture<'static, ()>> + Send + Sync + 'static,
197    {
198        let svc = Balance::new(discover);
199
200        let svc = BoxService::new(svc);
201        let (svc, worker) = Buffer::pair(svc, buffer_size);
202        executor.execute(Box::pin(worker));
203
204        Channel { svc }
205    }
206}
207
208impl Service<http::Request<Body>> for Channel {
209    type Response = http::Response<Body>;
210    type Error = super::Error;
211    type Future = ResponseFuture;
212
213    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
214        Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source)
215    }
216
217    fn call(&mut self, request: http::Request<Body>) -> Self::Future {
218        let inner = Service::call(&mut self.svc, request);
219
220        ResponseFuture { inner }
221    }
222}
223
224impl Future for ResponseFuture {
225    type Output = Result<Response<Body>, super::Error>;
226
227    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
228        Pin::new(&mut self.inner)
229            .poll(cx)
230            .map_err(super::Error::from_source)
231    }
232}
233
234impl fmt::Debug for Channel {
235    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236        f.debug_struct("Channel").finish()
237    }
238}
239
240impl fmt::Debug for ResponseFuture {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        f.debug_struct("ResponseFuture").finish()
243    }
244}