broker_tokio/stream/
collect.rs1use crate::stream::Stream;
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use core::future::Future;
5use core::mem;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use pin_project_lite::pin_project;
9
10pin_project! {
12 #[must_use = "streams do nothing unless polled"]
14 #[derive(Debug)]
15 pub struct Collect<T, U>
16 where
17 T: Stream,
18 U: FromStream<T::Item>,
19 {
20 #[pin]
21 stream: T,
22 collection: U::Collection,
23 }
24}
25
26pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
37
38impl<T, U> Collect<T, U>
39where
40 T: Stream,
41 U: FromStream<T::Item>,
42{
43 pub(super) fn new(stream: T) -> Collect<T, U> {
44 let (lower, upper) = stream.size_hint();
45 let collection = U::initialize(lower, upper);
46
47 Collect { stream, collection }
48 }
49}
50
51impl<T, U> Future for Collect<T, U>
52where
53 T: Stream,
54 U: FromStream<T::Item>,
55{
56 type Output = U;
57
58 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
59 use Poll::Ready;
60
61 loop {
62 let mut me = self.as_mut().project();
63
64 let item = match ready!(me.stream.poll_next(cx)) {
65 Some(item) => item,
66 None => {
67 return Ready(U::finalize(&mut me.collection));
68 }
69 };
70
71 if !U::extend(&mut me.collection, item) {
72 return Ready(U::finalize(&mut me.collection));
73 }
74 }
75 }
76}
77
78impl FromStream<()> for () {}
81
82impl sealed::FromStreamPriv<()> for () {
83 type Collection = ();
84
85 fn initialize(_lower: usize, _upper: Option<usize>) {}
86
87 fn extend(_collection: &mut (), _item: ()) -> bool {
88 true
89 }
90
91 fn finalize(_collection: &mut ()) {}
92}
93
94impl<T: AsRef<str>> FromStream<T> for String {}
95
96impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
97 type Collection = String;
98
99 fn initialize(_lower: usize, _upper: Option<usize>) -> String {
100 String::new()
101 }
102
103 fn extend(collection: &mut String, item: T) -> bool {
104 collection.push_str(item.as_ref());
105 true
106 }
107
108 fn finalize(collection: &mut String) -> String {
109 mem::replace(collection, String::new())
110 }
111}
112
113impl<T> FromStream<T> for Vec<T> {}
114
115impl<T> sealed::FromStreamPriv<T> for Vec<T> {
116 type Collection = Vec<T>;
117
118 fn initialize(lower: usize, _upper: Option<usize>) -> Vec<T> {
119 Vec::with_capacity(lower)
120 }
121
122 fn extend(collection: &mut Vec<T>, item: T) -> bool {
123 collection.push(item);
124 true
125 }
126
127 fn finalize(collection: &mut Vec<T>) -> Vec<T> {
128 mem::replace(collection, vec![])
129 }
130}
131
132impl<T> FromStream<T> for Box<[T]> {}
133
134impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
135 type Collection = Vec<T>;
136
137 fn initialize(lower: usize, upper: Option<usize>) -> Vec<T> {
138 <Vec<T> as sealed::FromStreamPriv<T>>::initialize(lower, upper)
139 }
140
141 fn extend(collection: &mut Vec<T>, item: T) -> bool {
142 <Vec<T> as sealed::FromStreamPriv<T>>::extend(collection, item)
143 }
144
145 fn finalize(collection: &mut Vec<T>) -> Box<[T]> {
146 <Vec<T> as sealed::FromStreamPriv<T>>::finalize(collection).into_boxed_slice()
147 }
148}
149
150impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
151
152impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
153where
154 U: FromStream<T>,
155{
156 type Collection = Result<U::Collection, E>;
157
158 fn initialize(lower: usize, upper: Option<usize>) -> Result<U::Collection, E> {
159 Ok(U::initialize(lower, upper))
160 }
161
162 fn extend(collection: &mut Self::Collection, item: Result<T, E>) -> bool {
163 assert!(collection.is_ok());
164 match item {
165 Ok(item) => {
166 let collection = collection.as_mut().ok().expect("invalid state");
167 U::extend(collection, item)
168 }
169 Err(err) => {
170 *collection = Err(err);
171 false
172 }
173 }
174 }
175
176 fn finalize(collection: &mut Self::Collection) -> Result<U, E> {
177 if let Ok(collection) = collection.as_mut() {
178 Ok(U::finalize(collection))
179 } else {
180 let res = mem::replace(collection, Ok(U::initialize(0, Some(0))));
181
182 if let Err(err) = res {
183 Err(err)
184 } else {
185 unreachable!();
186 }
187 }
188 }
189}
190
191impl<T: Buf> FromStream<T> for Bytes {}
192
193impl<T: Buf> sealed::FromStreamPriv<T> for Bytes {
194 type Collection = BytesMut;
195
196 fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut {
197 BytesMut::new()
198 }
199
200 fn extend(collection: &mut BytesMut, item: T) -> bool {
201 collection.put(item);
202 true
203 }
204
205 fn finalize(collection: &mut BytesMut) -> Bytes {
206 mem::replace(collection, BytesMut::new()).freeze()
207 }
208}
209
210impl<T: Buf> FromStream<T> for BytesMut {}
211
212impl<T: Buf> sealed::FromStreamPriv<T> for BytesMut {
213 type Collection = BytesMut;
214
215 fn initialize(_lower: usize, _upper: Option<usize>) -> BytesMut {
216 BytesMut::new()
217 }
218
219 fn extend(collection: &mut BytesMut, item: T) -> bool {
220 collection.put(item);
221 true
222 }
223
224 fn finalize(collection: &mut BytesMut) -> BytesMut {
225 mem::replace(collection, BytesMut::new())
226 }
227}
228
229pub(crate) mod sealed {
230 #[doc(hidden)]
231 pub trait FromStreamPriv<T> {
232 type Collection;
234
235 fn initialize(lower: usize, upper: Option<usize>) -> Self::Collection;
237
238 fn extend(collection: &mut Self::Collection, item: T) -> bool;
242
243 fn finalize(collection: &mut Self::Collection) -> Self;
245 }
246}