tower_util/call_all/
ordered.rs

1//! `Stream<Item = Request>` + `Service<Request>` => `Stream<Item = Response>`.
2
3use super::{common, Error};
4use futures_core::Stream;
5use futures_util::stream::FuturesOrdered;
6use pin_project::pin_project;
7use std::{
8    future::Future,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use tower_service::Service;
13
14/// This is a `futures::Stream` of responses resulting from calling the wrapped `tower::Service`
15/// for each request received on the wrapped `Stream`.
16///
17/// ```rust
18/// # use std::task::{Poll, Context};
19/// # use std::cell::Cell;
20/// # use std::error::Error;
21/// # use std::rc::Rc;
22/// #
23/// use futures_util::future::{ready, Ready};
24/// use futures_util::StreamExt;
25/// use tower_service::Service;
26/// use tower_util::ServiceExt;
27/// use tokio::prelude::*;
28///
29/// // First, we need to have a Service to process our requests.
30/// #[derive(Debug, Eq, PartialEq)]
31/// struct FirstLetter;
32/// impl Service<&'static str> for FirstLetter {
33///      type Response = &'static str;
34///      type Error = Box<dyn Error + Send + Sync>;
35///      type Future = Ready<Result<Self::Response, Self::Error>>;
36///
37///      fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38///          Poll::Ready(Ok(()))
39///      }
40///
41///      fn call(&mut self, req: &'static str) -> Self::Future {
42///          ready(Ok(&req[..1]))
43///      }
44/// }
45///
46/// #[tokio::main]
47/// async fn main() {
48///     // Next, we need a Stream of requests.
49///     let (mut reqs, rx) = tokio::sync::mpsc::unbounded_channel();
50///     // Note that we have to help Rust out here by telling it what error type to use.
51///     // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
52///     let mut rsps = FirstLetter.call_all(rx);
53///
54///     // Now, let's send a few requests and then check that we get the corresponding responses.
55///     reqs.send("one");
56///     reqs.send("two");
57///     reqs.send("three");
58///     drop(reqs);
59///
60///     // We then loop over the response Strem that we get back from call_all.
61///     let mut i = 0usize;
62///     while let Some(rsp) = rsps.next().await {
63///         // Each response is a Result (we could also have used TryStream::try_next)
64///         match (i + 1, rsp.unwrap()) {
65///             (1, "o") |
66///             (2, "t") |
67///             (3, "t") => {}
68///             (n, i) => {
69///                 unreachable!("{}. response was '{}'", n, i);
70///             }
71///         }
72///         i += 1;
73///     }
74///
75///     // And at the end, we can get the Service back when there are no more requests.
76///     assert_eq!(rsps.into_inner(), FirstLetter);
77/// }
78/// ```
79#[pin_project]
80#[derive(Debug)]
81pub struct CallAll<Svc, S>
82where
83    Svc: Service<S::Item>,
84    S: Stream,
85{
86    #[pin]
87    inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
88}
89
90impl<Svc, S> CallAll<Svc, S>
91where
92    Svc: Service<S::Item>,
93    Svc::Error: Into<Error>,
94    S: Stream,
95{
96    /// Create new `CallAll` combinator.
97    ///
98    /// Each request yielded by `stread` is passed to `svc`, and the resulting responses are
99    /// yielded in the same order by the implementation of `Stream` for `CallAll`.
100    pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> {
101        CallAll {
102            inner: common::CallAll::new(service, stream, FuturesOrdered::new()),
103        }
104    }
105
106    /// Extract the wrapped `Service`.
107    ///
108    /// # Panics
109    ///
110    /// Panics if `take_service` was already called.
111    pub fn into_inner(self) -> Svc {
112        self.inner.into_inner()
113    }
114
115    /// Extract the wrapped `Service`.
116    ///
117    /// This `CallAll` can no longer be used after this function has been called.
118    ///
119    /// # Panics
120    ///
121    /// Panics if `take_service` was already called.
122    pub fn take_service(self: Pin<&mut Self>) -> Svc {
123        self.project().inner.take_service()
124    }
125
126    /// Return responses as they are ready, regardless of the initial order.
127    ///
128    /// This function must be called before the stream is polled.
129    ///
130    /// # Panics
131    ///
132    /// Panics if `poll` was called.
133    pub fn unordered(self) -> super::CallAllUnordered<Svc, S> {
134        self.inner.unordered()
135    }
136}
137
138impl<Svc, S> Stream for CallAll<Svc, S>
139where
140    Svc: Service<S::Item>,
141    Svc::Error: Into<Error>,
142    S: Stream,
143{
144    type Item = Result<Svc::Response, Error>;
145
146    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147        self.project().inner.poll_next(cx)
148    }
149}
150
151impl<F: Future> common::Drive<F> for FuturesOrdered<F> {
152    fn is_empty(&self) -> bool {
153        FuturesOrdered::is_empty(self)
154    }
155
156    fn push(&mut self, future: F) {
157        FuturesOrdered::push(self, future)
158    }
159
160    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
161        Stream::poll_next(Pin::new(self), cx)
162    }
163}