tokio_stream/stream_ext/
collect.rs

1use crate::Stream;
2
3use core::future::Future;
4use core::marker::PhantomPinned;
5use core::mem;
6use core::pin::Pin;
7use core::task::{ready, Context, Poll};
8use pin_project_lite::pin_project;
9
10// Do not export this struct until `FromStream` can be unsealed.
11pin_project! {
12    /// Future returned by the [`collect`](super::StreamExt::collect) method.
13    #[must_use = "futures do nothing unless you `.await` or poll them"]
14    #[derive(Debug)]
15    pub struct Collect<T, U>
16    where
17        T: Stream,
18        U: FromStream<T::Item>,
19    {
20        #[pin]
21        stream: T,
22        collection: U::InternalCollection,
23        // Make this future `!Unpin` for compatibility with async trait methods.
24        #[pin]
25        _pin: PhantomPinned,
26    }
27}
28
29/// Convert from a [`Stream`].
30///
31/// This trait is not intended to be used directly. Instead, call
32/// [`StreamExt::collect()`](super::StreamExt::collect).
33///
34/// # Implementing
35///
36/// Currently, this trait may not be implemented by third parties. The trait is
37/// sealed in order to make changes in the future. Stabilization is pending
38/// enhancements to the Rust language.
39pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
40
41impl<T, U> Collect<T, U>
42where
43    T: Stream,
44    U: FromStream<T::Item>,
45{
46    pub(super) fn new(stream: T) -> Collect<T, U> {
47        let (lower, upper) = stream.size_hint();
48        let collection = U::initialize(sealed::Internal, lower, upper);
49
50        Collect {
51            stream,
52            collection,
53            _pin: PhantomPinned,
54        }
55    }
56}
57
58impl<T, U> Future for Collect<T, U>
59where
60    T: Stream,
61    U: FromStream<T::Item>,
62{
63    type Output = U;
64
65    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
66        use Poll::Ready;
67
68        loop {
69            let me = self.as_mut().project();
70
71            let item = match ready!(me.stream.poll_next(cx)) {
72                Some(item) => item,
73                None => {
74                    return Ready(U::finalize(sealed::Internal, me.collection));
75                }
76            };
77
78            if !U::extend(sealed::Internal, me.collection, item) {
79                return Ready(U::finalize(sealed::Internal, me.collection));
80            }
81        }
82    }
83}
84
85// ===== FromStream implementations
86
87impl FromStream<()> for () {}
88
89impl sealed::FromStreamPriv<()> for () {
90    type InternalCollection = ();
91
92    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}
93
94    fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
95        true
96    }
97
98    fn finalize(_: sealed::Internal, _collection: &mut ()) {}
99}
100
101impl<T: AsRef<str>> FromStream<T> for String {}
102
103impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
104    type InternalCollection = String;
105
106    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
107        String::new()
108    }
109
110    fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
111        collection.push_str(item.as_ref());
112        true
113    }
114
115    fn finalize(_: sealed::Internal, collection: &mut String) -> String {
116        mem::take(collection)
117    }
118}
119
120impl<T> FromStream<T> for Vec<T> {}
121
122impl<T> sealed::FromStreamPriv<T> for Vec<T> {
123    type InternalCollection = Vec<T>;
124
125    fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
126        Vec::with_capacity(lower)
127    }
128
129    fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
130        collection.push(item);
131        true
132    }
133
134    fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
135        mem::take(collection)
136    }
137}
138
139impl<T> FromStream<T> for Box<[T]> {}
140
141impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
142    type InternalCollection = Vec<T>;
143
144    fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
145        <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
146    }
147
148    fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
149        <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
150    }
151
152    fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
153        <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
154            .into_boxed_slice()
155    }
156}
157
158impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
159
160impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
161where
162    U: FromStream<T>,
163{
164    type InternalCollection = Result<U::InternalCollection, E>;
165
166    fn initialize(
167        _: sealed::Internal,
168        lower: usize,
169        upper: Option<usize>,
170    ) -> Result<U::InternalCollection, E> {
171        Ok(U::initialize(sealed::Internal, lower, upper))
172    }
173
174    fn extend(
175        _: sealed::Internal,
176        collection: &mut Self::InternalCollection,
177        item: Result<T, E>,
178    ) -> bool {
179        assert!(collection.is_ok());
180        match item {
181            Ok(item) => {
182                let collection = collection.as_mut().ok().expect("invalid state");
183                U::extend(sealed::Internal, collection, item)
184            }
185            Err(err) => {
186                *collection = Err(err);
187                false
188            }
189        }
190    }
191
192    fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
193        if let Ok(collection) = collection.as_mut() {
194            Ok(U::finalize(sealed::Internal, collection))
195        } else {
196            let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
197
198            Err(res.map(drop).unwrap_err())
199        }
200    }
201}
202
203pub(crate) mod sealed {
204    #[doc(hidden)]
205    pub trait FromStreamPriv<T> {
206        /// Intermediate type used during collection process
207        ///
208        /// The name of this type is internal and cannot be relied upon.
209        type InternalCollection;
210
211        /// Initialize the collection
212        fn initialize(
213            internal: Internal,
214            lower: usize,
215            upper: Option<usize>,
216        ) -> Self::InternalCollection;
217
218        /// Extend the collection with the received item
219        ///
220        /// Return `true` to continue streaming, `false` complete collection.
221        fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;
222
223        /// Finalize collection into target type.
224        fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
225    }
226
227    #[allow(missing_debug_implementations)]
228    pub struct Internal;
229}