tower_discover/
stream.rs

1use crate::{Change, Discover};
2use futures_core::{ready, TryStream};
3use pin_project::pin_project;
4use std::hash::Hash;
5use std::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tower_service::Service;
10
11/// Dynamic service discovery based on a stream of service changes.
12#[pin_project]
13#[derive(Debug)]
14pub struct ServiceStream<S> {
15    #[pin]
16    inner: S,
17}
18
19impl<S> ServiceStream<S> {
20    #[allow(missing_docs)]
21    pub fn new<K, Svc, Request>(services: S) -> Self
22    where
23        S: TryStream<Ok = Change<K, Svc>>,
24        K: Hash + Eq,
25        Svc: Service<Request>,
26    {
27        ServiceStream { inner: services }
28    }
29}
30
31impl<S, K, Svc> Discover for ServiceStream<S>
32where
33    K: Hash + Eq,
34    S: TryStream<Ok = Change<K, Svc>>,
35{
36    type Key = K;
37    type Service = Svc;
38    type Error = S::Error;
39
40    fn poll_discover(
41        self: Pin<&mut Self>,
42        cx: &mut Context<'_>,
43    ) -> Poll<Result<Change<Self::Key, Self::Service>, Self::Error>> {
44        match ready!(self.project().inner.try_poll_next(cx)).transpose()? {
45            Some(c) => Poll::Ready(Ok(c)),
46            None => {
47                // there are no more service changes coming
48                Poll::Pending
49            }
50        }
51    }
52}