tower_util/call_all/
unordered.rs

1//! `Stream<Item = Request>` + `Service<Request>` => `Stream<Item = Response>`.
2
3use super::{common, Error};
4use futures_core::Stream;
5use futures_util::stream::FuturesUnordered;
6use pin_project::pin_project;
7use std::{
8    future::Future,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use tower_service::Service;
13
14/// A stream of responses received from the inner service in received order.
15///
16/// Similar to `CallAll` except, instead of yielding responses in request order,
17/// responses are returned as they are available.
18#[pin_project]
19#[derive(Debug)]
20pub struct CallAllUnordered<Svc, S>
21where
22    Svc: Service<S::Item>,
23    S: Stream,
24{
25    #[pin]
26    inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>,
27}
28
29impl<Svc, S> CallAllUnordered<Svc, S>
30where
31    Svc: Service<S::Item>,
32    Svc::Error: Into<Error>,
33    S: Stream,
34{
35    /// Create new `CallAllUnordered` combinator.
36    ///
37    /// Each request yielded by `stread` is passed to `svc`, and the resulting responses are
38    /// yielded in the same order by the implementation of `Stream` for
39    /// `CallAllUnordered`.
40    pub fn new(service: Svc, stream: S) -> CallAllUnordered<Svc, S> {
41        CallAllUnordered {
42            inner: common::CallAll::new(service, stream, FuturesUnordered::new()),
43        }
44    }
45
46    /// Extract the wrapped `Service`.
47    ///
48    /// # Panics
49    ///
50    /// Panics if `take_service` was already called.
51    pub fn into_inner(self) -> Svc {
52        self.inner.into_inner()
53    }
54
55    /// Extract the wrapped `Service`.
56    ///
57    /// This `CallAll` can no longer be used after this function has been called.
58    ///
59    /// # Panics
60    ///
61    /// Panics if `take_service` was already called.
62    pub fn take_service(self: Pin<&mut Self>) -> Svc {
63        self.project().inner.take_service()
64    }
65}
66
67impl<Svc, S> Stream for CallAllUnordered<Svc, S>
68where
69    Svc: Service<S::Item>,
70    Svc::Error: Into<Error>,
71    S: Stream,
72{
73    type Item = Result<Svc::Response, Error>;
74
75    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76        self.project().inner.poll_next(cx)
77    }
78}
79
80impl<F: Future> common::Drive<F> for FuturesUnordered<F> {
81    fn is_empty(&self) -> bool {
82        FuturesUnordered::is_empty(self)
83    }
84
85    fn push(&mut self, future: F) {
86        FuturesUnordered::push(self, future)
87    }
88
89    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
90        Stream::poll_next(Pin::new(self), cx)
91    }
92}