tonic/transport/channel/
mod.rs1mod endpoint;
4pub(crate) mod service;
5#[cfg(feature = "_tls-any")]
6mod tls;
7mod uds_connector;
8
9pub use self::service::Change;
10pub use endpoint::Endpoint;
11#[cfg(feature = "_tls-any")]
12pub use tls::ClientTlsConfig;
13
14use self::service::{Connection, DynamicServiceStream, Executor, SharedExec};
15use crate::body::Body;
16use bytes::Bytes;
17use http::{
18 uri::{InvalidUri, Uri},
19 Request, Response,
20};
21use std::{
22 fmt,
23 future::Future,
24 hash::Hash,
25 pin::Pin,
26 task::{Context, Poll},
27};
28use tokio::sync::mpsc::{channel, Sender};
29
30use hyper::rt;
31use tower::balance::p2c::Balance;
32use tower::{
33 buffer::{future::ResponseFuture as BufferResponseFuture, Buffer},
34 discover::Discover,
35 util::BoxService,
36 Service,
37};
38
39type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
40
41const DEFAULT_BUFFER_SIZE: usize = 1024;
42
43#[derive(Clone)]
67pub struct Channel {
68 svc: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, crate::BoxError>>>,
69}
70
71pub struct ResponseFuture {
75 inner: BufferResponseFuture<BoxFuture<'static, Result<Response<Body>, crate::BoxError>>>,
76}
77
78impl Channel {
79 pub fn builder(uri: Uri) -> Endpoint {
81 Endpoint::from(uri)
82 }
83
84 pub fn from_static(s: &'static str) -> Endpoint {
91 let uri = Uri::from_static(s);
92 Self::builder(uri)
93 }
94
95 pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> {
102 let uri = Uri::from_maybe_shared(s.into())?;
103 Ok(Self::builder(uri))
104 }
105
106 pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
111 let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
112 list.for_each(|endpoint| {
113 tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
114 .unwrap();
115 });
116
117 channel
118 }
119
120 pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
124 where
125 K: Hash + Eq + Send + Clone + 'static,
126 {
127 Self::balance_channel_with_executor(capacity, SharedExec::tokio())
128 }
129
130 pub fn balance_channel_with_executor<K, E>(
136 capacity: usize,
137 executor: E,
138 ) -> (Self, Sender<Change<K, Endpoint>>)
139 where
140 K: Hash + Eq + Send + Clone + 'static,
141 E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
142 {
143 let (tx, rx) = channel(capacity);
144 let list = DynamicServiceStream::new(rx);
145 (Self::balance(list, DEFAULT_BUFFER_SIZE, executor), tx)
146 }
147
148 pub fn new<C>(connector: C, endpoint: Endpoint) -> Self
152 where
153 C: Service<Uri> + Send + 'static,
154 C::Error: Into<crate::BoxError> + Send,
155 C::Future: Send,
156 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
157 {
158 let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
159 let executor = endpoint.executor.clone();
160
161 let svc = Connection::lazy(connector, endpoint);
162 let (svc, worker) = Buffer::pair(svc, buffer_size);
163
164 executor.execute(worker);
165
166 Channel { svc }
167 }
168
169 pub async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
173 where
174 C: Service<Uri> + Send + 'static,
175 C::Error: Into<crate::BoxError> + Send,
176 C::Future: Unpin + Send,
177 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
178 {
179 let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
180 let executor = endpoint.executor.clone();
181
182 let svc = Connection::connect(connector, endpoint)
183 .await
184 .map_err(super::Error::from_source)?;
185 let (svc, worker) = Buffer::pair(svc, buffer_size);
186 executor.execute(worker);
187
188 Ok(Channel { svc })
189 }
190
191 pub(crate) fn balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self
192 where
193 D: Discover<Service = Connection> + Unpin + Send + 'static,
194 D::Error: Into<crate::BoxError>,
195 D::Key: Hash + Send + Clone,
196 E: Executor<BoxFuture<'static, ()>> + Send + Sync + 'static,
197 {
198 let svc = Balance::new(discover);
199
200 let svc = BoxService::new(svc);
201 let (svc, worker) = Buffer::pair(svc, buffer_size);
202 executor.execute(Box::pin(worker));
203
204 Channel { svc }
205 }
206}
207
208impl Service<http::Request<Body>> for Channel {
209 type Response = http::Response<Body>;
210 type Error = super::Error;
211 type Future = ResponseFuture;
212
213 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
214 Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source)
215 }
216
217 fn call(&mut self, request: http::Request<Body>) -> Self::Future {
218 let inner = Service::call(&mut self.svc, request);
219
220 ResponseFuture { inner }
221 }
222}
223
224impl Future for ResponseFuture {
225 type Output = Result<Response<Body>, super::Error>;
226
227 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
228 Pin::new(&mut self.inner)
229 .poll(cx)
230 .map_err(super::Error::from_source)
231 }
232}
233
234impl fmt::Debug for Channel {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 f.debug_struct("Channel").finish()
237 }
238}
239
240impl fmt::Debug for ResponseFuture {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 f.debug_struct("ResponseFuture").finish()
243 }
244}