tower_util/call_all/
unordered.rs1use super::{common, Error};
4use futures_core::Stream;
5use futures_util::stream::FuturesUnordered;
6use pin_project::pin_project;
7use std::{
8 future::Future,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use tower_service::Service;
13
14#[pin_project]
19#[derive(Debug)]
20pub struct CallAllUnordered<Svc, S>
21where
22 Svc: Service<S::Item>,
23 S: Stream,
24{
25 #[pin]
26 inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>,
27}
28
29impl<Svc, S> CallAllUnordered<Svc, S>
30where
31 Svc: Service<S::Item>,
32 Svc::Error: Into<Error>,
33 S: Stream,
34{
35 pub fn new(service: Svc, stream: S) -> CallAllUnordered<Svc, S> {
41 CallAllUnordered {
42 inner: common::CallAll::new(service, stream, FuturesUnordered::new()),
43 }
44 }
45
46 pub fn into_inner(self) -> Svc {
52 self.inner.into_inner()
53 }
54
55 pub fn take_service(self: Pin<&mut Self>) -> Svc {
63 self.project().inner.take_service()
64 }
65}
66
67impl<Svc, S> Stream for CallAllUnordered<Svc, S>
68where
69 Svc: Service<S::Item>,
70 Svc::Error: Into<Error>,
71 S: Stream,
72{
73 type Item = Result<Svc::Response, Error>;
74
75 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76 self.project().inner.poll_next(cx)
77 }
78}
79
80impl<F: Future> common::Drive<F> for FuturesUnordered<F> {
81 fn is_empty(&self) -> bool {
82 FuturesUnordered::is_empty(self)
83 }
84
85 fn push(&mut self, future: F) {
86 FuturesUnordered::push(self, future)
87 }
88
89 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
90 Stream::poll_next(Pin::new(self), cx)
91 }
92}