tonic/transport/channel/service/
discover.rs

1use super::super::{Connection, Endpoint};
2
3use std::{
4    hash::Hash,
5    pin::Pin,
6    task::{Context, Poll},
7};
8use tokio::sync::mpsc::Receiver;
9use tokio_stream::Stream;
10use tower::discover::Change as TowerChange;
11
12/// A change in the service set.
13#[derive(Debug, Clone)]
14pub enum Change<K, V> {
15    /// A new service identified by key `K` was identified.
16    Insert(K, V),
17    /// The service identified by key `K` disappeared.
18    Remove(K),
19}
20
21pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
22    changes: Receiver<Change<K, Endpoint>>,
23}
24
25impl<K: Hash + Eq + Clone> DynamicServiceStream<K> {
26    pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self {
27        Self { changes }
28    }
29}
30
31impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> {
32    type Item = Result<TowerChange<K, Connection>, crate::BoxError>;
33
34    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35        match Pin::new(&mut self.changes).poll_recv(cx) {
36            Poll::Pending | Poll::Ready(None) => Poll::Pending,
37            Poll::Ready(Some(change)) => match change {
38                Change::Insert(k, endpoint) => {
39                    let connection = Connection::lazy(endpoint.http_connector(), endpoint);
40                    Poll::Ready(Some(Ok(TowerChange::Insert(k, connection))))
41                }
42                Change::Remove(k) => Poll::Ready(Some(Ok(TowerChange::Remove(k)))),
43            },
44        }
45    }
46}
47
48impl<K: Hash + Eq + Clone> Unpin for DynamicServiceStream<K> {}