broker_tokio/stream/
collect.rs

1use crate::stream::Stream;
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use core::future::Future;
5use core::mem;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use pin_project_lite::pin_project;
9
10// Do not export this struct until `FromStream` can be unsealed.
11pin_project! {
12    /// Future returned by the [`collect`](super::StreamExt::collect) method.
13    #[must_use = "streams do nothing unless polled"]
14    #[derive(Debug)]
15    pub struct Collect<T, U>
16    where
17        T: Stream,
18        U: FromStream<T::Item>,
19    {
20        #[pin]
21        stream: T,
22        collection: U::Collection,
23    }
24}
25
26/// Convert from a [`Stream`](crate::stream::Stream).
27///
28/// This trait is not intended to be used directly. Instead, call
29/// [`StreamExt::collect()`](super::StreamExt::collect).
30///
31/// # Implementing
32///
33/// Currently, this trait may not be implemented by third parties. The trait is
34/// sealed in order to make changes in the future. Stabilization is pending
35/// enhancements to the Rust langague.
36pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
37
38impl<T, U> Collect<T, U>
39where
40    T: Stream,
41    U: FromStream<T::Item>,
42{
43    pub(super) fn new(stream: T) -> Collect<T, U> {
44        let (lower, upper) = stream.size_hint();
45        let collection = U::initialize(lower, upper);
46
47        Collect { stream, collection }
48    }
49}
50
51impl<T, U> Future for Collect<T, U>
52where
53    T: Stream,
54    U: FromStream<T::Item>,
55{
56    type Output = U;
57
58    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
59        use Poll::Ready;
60
61        loop {
62            let mut me = self.as_mut().project();
63
64            let item = match ready!(me.stream.poll_next(cx)) {
65                Some(item) => item,
66                None => {
67                    return Ready(U::finalize(&mut me.collection));
68                }
69            };
70
71            if !U::extend(&mut me.collection, item) {
72                return Ready(U::finalize(&mut me.collection));
73            }
74        }
75    }
76}
77
78// ===== FromStream implementations
79
80impl FromStream<()> for () {}
81
82impl sealed::FromStreamPriv<()> for () {
83    type Collection = ();
84
85    fn initialize(_lower: usize, _upper: Option<usize>) {}
86
87    fn extend(_collection: &mut (), _item: ()) -> bool {
88        true
89    }
90
91    fn finalize(_collection: &mut ()) {}
92}
93
94impl<T: AsRef<str>> FromStream<T> for String {}
95
96impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
97    type Collection = String;
98
99    fn initialize(_lower: usize, _upper: Option<usize>) -> String {
100        String::new()
101    }
102
103    fn extend(collection: &mut String, item: T) -> bool {
104        collection.push_str(item.as_ref());
105        true
106    }
107
108    fn finalize(collection: &mut String) -> String {
109        mem::replace(collection, String::new())
110    }
111}
112
113impl<T> FromStream<T> for Vec<T> {}
114
115impl<T> sealed::FromStreamPriv<T> for Vec<T> {
116    type Collection = Vec<T>;
117
118    fn initialize(lower: usize, _upper: Option<usize>) -> Vec<T> {
119        Vec::with_capacity(lower)
120    }
121
122    fn extend(collection: &mut Vec<T>, item: T) -> bool {
123        collection.push(item);
124        true
125    }
126
127    fn finalize(collection: &mut Vec<T>) -> Vec<T> {
128        mem::replace(collection, vec![])
129    }
130}
131
132impl<T> FromStream<T> for Box<[T]> {}
133
134impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
135    type Collection = Vec<T>;
136
137    fn initialize(lower: usize, upper: Option<usize>) -> Vec<T> {
138        <Vec<T> as sealed::FromStreamPriv<T>>::initialize(lower, upper)
139    }
140
141    fn extend(collection: &mut Vec<T>, item: T) -> bool {
142        <Vec<T> as sealed::FromStreamPriv<T>>::extend(collection, item)
143    }
144
145    fn finalize(collection: &mut Vec<T>) -> Box<[T]> {
146        <Vec<T> as sealed::FromStreamPriv<T>>::finalize(collection).into_boxed_slice()
147    }
148}
149
150impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
151
152impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
153where
154    U: FromStream<T>,
155{
156    type Collection = Result<U::Collection, E>;
157
158    fn initialize(lower: usize, upper: Option<usize>) -> Result<U::Collection, E> {
159        Ok(U::initialize(lower, upper))
160    }
161
162    fn extend(collection: &mut Self::Collection, item: Result<T, E>) -> bool {
163        assert!(collection.is_ok());
164        match item {
165            Ok(item) => {
166                let collection = collection.as_mut().ok().expect("invalid state");
167                U::extend(collection, item)
168            }
169            Err(err) => {
170                *collection = Err(err);
171                false
172            }
173        }
174    }
175
176    fn finalize(collection: &mut Self::Collection) -> Result<U, E> {
177        if let Ok(collection) = collection.as_mut() {
178            Ok(U::finalize(collection))
179        } else {
180            let res = mem::replace(collection, Ok(U::initialize(0, Some(0))));
181
182            if let Err(err) = res {
183                Err(err)
184            } else {
185                unreachable!();
186            }
187        }
188    }
189}
190
191impl<T: Buf> FromStream<T> for Bytes {}
192
193impl<T: Buf> sealed::FromStreamPriv<T> for Bytes {
194    type Collection = BytesMut;
195
196    fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut {
197        BytesMut::new()
198    }
199
200    fn extend(collection: &mut BytesMut, item: T) -> bool {
201        collection.put(item);
202        true
203    }
204
205    fn finalize(collection: &mut BytesMut) -> Bytes {
206        mem::replace(collection, BytesMut::new()).freeze()
207    }
208}
209
210impl<T: Buf> FromStream<T> for BytesMut {}
211
212impl<T: Buf> sealed::FromStreamPriv<T> for BytesMut {
213    type Collection = BytesMut;
214
215    fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut {
216        BytesMut::new()
217    }
218
219    fn extend(collection: &mut BytesMut, item: T) -> bool {
220        collection.put(item);
221        true
222    }
223
224    fn finalize(collection: &mut BytesMut) -> BytesMut {
225        mem::replace(collection, BytesMut::new())
226    }
227}
228
229pub(crate) mod sealed {
230    #[doc(hidden)]
231    pub trait FromStreamPriv<T> {
232        /// Intermediate type used during collection process
233        type Collection;
234
235        /// Initialize the collection
236        fn initialize(lower: usize, upper: Option<usize>) -> Self::Collection;
237
238        /// Extend the collection with the received item
239        ///
240        /// Return `true` to continue streaming, `false` complete collection.
241        fn extend(collection: &mut Self::Collection, item: T) -> bool;
242
243        /// Finalize collection into target type.
244        fn finalize(collection: &mut Self::Collection) -> Self;
245    }
246}