tower_util/call_all/ordered.rs
1//! `Stream<Item = Request>` + `Service<Request>` => `Stream<Item = Response>`.
2
3use super::{common, Error};
4use futures_core::Stream;
5use futures_util::stream::FuturesOrdered;
6use pin_project::pin_project;
7use std::{
8 future::Future,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use tower_service::Service;
13
14/// This is a `futures::Stream` of responses resulting from calling the wrapped `tower::Service`
15/// for each request received on the wrapped `Stream`.
16///
17/// ```rust
18/// # use std::task::{Poll, Context};
19/// # use std::cell::Cell;
20/// # use std::error::Error;
21/// # use std::rc::Rc;
22/// #
23/// use futures_util::future::{ready, Ready};
24/// use futures_util::StreamExt;
25/// use tower_service::Service;
26/// use tower_util::ServiceExt;
27/// use tokio::prelude::*;
28///
29/// // First, we need to have a Service to process our requests.
30/// #[derive(Debug, Eq, PartialEq)]
31/// struct FirstLetter;
32/// impl Service<&'static str> for FirstLetter {
33/// type Response = &'static str;
34/// type Error = Box<dyn Error + Send + Sync>;
35/// type Future = Ready<Result<Self::Response, Self::Error>>;
36///
37/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38/// Poll::Ready(Ok(()))
39/// }
40///
41/// fn call(&mut self, req: &'static str) -> Self::Future {
42/// ready(Ok(&req[..1]))
43/// }
44/// }
45///
46/// #[tokio::main]
47/// async fn main() {
48/// // Next, we need a Stream of requests.
49/// let (mut reqs, rx) = tokio::sync::mpsc::unbounded_channel();
50/// // Note that we have to help Rust out here by telling it what error type to use.
51/// // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
52/// let mut rsps = FirstLetter.call_all(rx);
53///
54/// // Now, let's send a few requests and then check that we get the corresponding responses.
55/// reqs.send("one");
56/// reqs.send("two");
57/// reqs.send("three");
58/// drop(reqs);
59///
60/// // We then loop over the response Strem that we get back from call_all.
61/// let mut i = 0usize;
62/// while let Some(rsp) = rsps.next().await {
63/// // Each response is a Result (we could also have used TryStream::try_next)
64/// match (i + 1, rsp.unwrap()) {
65/// (1, "o") |
66/// (2, "t") |
67/// (3, "t") => {}
68/// (n, i) => {
69/// unreachable!("{}. response was '{}'", n, i);
70/// }
71/// }
72/// i += 1;
73/// }
74///
75/// // And at the end, we can get the Service back when there are no more requests.
76/// assert_eq!(rsps.into_inner(), FirstLetter);
77/// }
78/// ```
79#[pin_project]
80#[derive(Debug)]
81pub struct CallAll<Svc, S>
82where
83 Svc: Service<S::Item>,
84 S: Stream,
85{
86 #[pin]
87 inner: common::CallAll<Svc, S, FuturesOrdered<Svc::Future>>,
88}
89
90impl<Svc, S> CallAll<Svc, S>
91where
92 Svc: Service<S::Item>,
93 Svc::Error: Into<Error>,
94 S: Stream,
95{
96 /// Create new `CallAll` combinator.
97 ///
98 /// Each request yielded by `stread` is passed to `svc`, and the resulting responses are
99 /// yielded in the same order by the implementation of `Stream` for `CallAll`.
100 pub fn new(service: Svc, stream: S) -> CallAll<Svc, S> {
101 CallAll {
102 inner: common::CallAll::new(service, stream, FuturesOrdered::new()),
103 }
104 }
105
106 /// Extract the wrapped `Service`.
107 ///
108 /// # Panics
109 ///
110 /// Panics if `take_service` was already called.
111 pub fn into_inner(self) -> Svc {
112 self.inner.into_inner()
113 }
114
115 /// Extract the wrapped `Service`.
116 ///
117 /// This `CallAll` can no longer be used after this function has been called.
118 ///
119 /// # Panics
120 ///
121 /// Panics if `take_service` was already called.
122 pub fn take_service(self: Pin<&mut Self>) -> Svc {
123 self.project().inner.take_service()
124 }
125
126 /// Return responses as they are ready, regardless of the initial order.
127 ///
128 /// This function must be called before the stream is polled.
129 ///
130 /// # Panics
131 ///
132 /// Panics if `poll` was called.
133 pub fn unordered(self) -> super::CallAllUnordered<Svc, S> {
134 self.inner.unordered()
135 }
136}
137
138impl<Svc, S> Stream for CallAll<Svc, S>
139where
140 Svc: Service<S::Item>,
141 Svc::Error: Into<Error>,
142 S: Stream,
143{
144 type Item = Result<Svc::Response, Error>;
145
146 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
147 self.project().inner.poll_next(cx)
148 }
149}
150
151impl<F: Future> common::Drive<F> for FuturesOrdered<F> {
152 fn is_empty(&self) -> bool {
153 FuturesOrdered::is_empty(self)
154 }
155
156 fn push(&mut self, future: F) {
157 FuturesOrdered::push(self, future)
158 }
159
160 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
161 Stream::poll_next(Pin::new(self), cx)
162 }
163}