ordered_stream/
adapters.rs

1use crate::*;
2use core::future::Future;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use futures_core::{FusedStream, Stream};
6
7/// Helpers for chaining [`OrderedStream`]s.
8pub trait OrderedStreamExt: OrderedStream {
9    /// Apply a closure to the data.
10    ///
11    /// This does not change the ordering.
12    fn map<F, R>(self, f: F) -> Map<Self, F>
13    where
14        Self: Sized,
15        F: FnMut(Self::Data) -> R,
16    {
17        Map { stream: self, f }
18    }
19
20    /// Apply a closure to the items that has access to the ordering data.
21    fn map_item<F, R>(self, f: F) -> MapItem<Self, F>
22    where
23        Self: Sized,
24        F: FnMut(&Self::Ordering, Self::Data) -> R,
25    {
26        MapItem { stream: self, f }
27    }
28
29    /// Apply a closure to the items that can change the type of the ordering value.
30    ///
31    /// A bidirectional mapping for ordering values is required in order to remap `before` values.
32    /// It is the caller's responsibility to ensure that the items in the mapped stream still meet
33    /// the ordering requirements that [`OrderedStream`] expects.
34    fn map_ordering<NewOrdering, NewData, MapInto, MapFrom>(
35        self,
36        map_into: MapInto,
37        map_from: MapFrom,
38    ) -> MapOrdering<Self, MapInto, MapFrom>
39    where
40        Self: Sized,
41        MapInto: FnMut(Self::Ordering, Self::Data) -> (NewOrdering, NewData),
42        MapFrom: FnMut(&NewOrdering) -> Option<Self::Ordering>,
43        NewOrdering: Ord,
44    {
45        MapOrdering {
46            stream: self,
47            map_into,
48            map_from,
49        }
50    }
51
52    fn filter<F>(self, filter: F) -> Filter<Self, F>
53    where
54        Self: Sized,
55        F: FnMut(&Self::Data) -> bool,
56    {
57        Filter {
58            stream: self,
59            filter,
60        }
61    }
62
63    fn filter_map<F, R>(self, filter: F) -> FilterMap<Self, F>
64    where
65        Self: Sized,
66        F: FnMut(Self::Data) -> Option<R>,
67    {
68        FilterMap {
69            stream: self,
70            filter,
71        }
72    }
73
74    /// Apply a closure that produces a [`Future`] to items, running the future on each item in
75    /// sequence before processing the next.
76    ///
77    /// This has the side effect of buffering items that are not before the requested ordering
78    /// point; you can use [`ready`](core::future::ready) as the closure to take advantage of this
79    /// behavior if you don't want to buffer items yourself.
80    fn then<F, Fut>(self, then: F) -> Then<Self, F, Fut>
81    where
82        Self: Sized,
83        F: FnMut(Self::Data) -> Fut,
84        Fut: Future,
85    {
86        Then {
87            stream: self,
88            then,
89            future: ThenItem::Idle,
90        }
91    }
92
93    /// Convert this into a [`Stream`], discarding the ordering information.
94    fn into_stream(self) -> IntoStream<Self>
95    where
96        Self: Sized,
97    {
98        IntoStream { stream: self }
99    }
100
101    /// Convert this into a [`Stream`], keeping the ordering objects.
102    fn into_tuple_stream(self) -> IntoTupleStream<Self>
103    where
104        Self: Sized,
105    {
106        IntoTupleStream { stream: self }
107    }
108
109    /// Convert this into a [`Stream`], keeping only the ordering objects.
110    fn into_ordering(self) -> IntoOrdering<Self>
111    where
112        Self: Sized,
113    {
114        IntoOrdering { stream: self }
115    }
116
117    /// Return the next item in this stream.
118    fn next(&mut self) -> Next<'_, Self>
119    where
120        Self: Unpin,
121    {
122        Next {
123            stream: Pin::new(self),
124        }
125    }
126
127    /// Return a [`PollResult`] corresponding to the next item in the stream.
128    fn next_before<'a>(&'a mut self, before: Option<&'a Self::Ordering>) -> NextBefore<'a, Self>
129    where
130        Self: Unpin,
131    {
132        NextBefore {
133            stream: Pin::new(self),
134            before,
135        }
136    }
137
138    fn peekable(self) -> Peekable<Self>
139    where
140        Self: Sized,
141    {
142        Peekable {
143            stream: self,
144            item: None,
145            is_terminated: false,
146        }
147    }
148}
149
150impl<T: ?Sized + OrderedStream> OrderedStreamExt for T {}
151
152pin_project_lite::pin_project! {
153    /// An [`OrderedStream`] wrapper around a [`Stream`].
154    ///
155    /// This does not use any future or past knowledge of elements, and so is suitable if the
156    /// stream rarely or never blocks.  Prefer using [`FromStream`] if you plan to filter or join
157    /// this stream and want other streams to be able to make progress while this one blocks.
158    #[derive(Debug)]
159    pub struct FromStreamDirect<S, F> {
160        #[pin]
161        stream: S,
162        split_item: F,
163    }
164}
165
166impl<S, F> FromStreamDirect<S, F> {
167    /// Create a new [`OrderedStream`] by applying a `split_item` closure to each element
168    /// produced by the original stream.
169    pub fn new<Ordering, Data>(stream: S, split_item: F) -> Self
170    where
171        S: Stream,
172        F: FnMut(S::Item) -> (Ordering, Data),
173        Ordering: Ord,
174    {
175        Self { stream, split_item }
176    }
177
178    /// Helper function to simplify the creation of a stream when you have a get_ordering function.
179    pub fn with_ordering<Ordering>(
180        stream: S,
181        mut get_ordering: F,
182    ) -> FromStreamDirect<S, impl FnMut(S::Item) -> (Ordering, S::Item)>
183    where
184        S: Stream,
185        F: FnMut(&S::Item) -> Ordering,
186        Ordering: Ord,
187    {
188        FromStreamDirect::new(stream, move |data| {
189            let ordering = get_ordering(&data);
190            (ordering, data)
191        })
192    }
193}
194
195impl<S, F, Ordering, Data> OrderedStream for FromStreamDirect<S, F>
196where
197    S: Stream,
198    F: FnMut(S::Item) -> (Ordering, Data),
199    Ordering: Ord,
200{
201    type Data = Data;
202    type Ordering = Ordering;
203
204    fn poll_next_before(
205        self: Pin<&mut Self>,
206        cx: &mut Context<'_>,
207        _: Option<&Self::Ordering>,
208    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
209        let this = self.project();
210        let split_item = this.split_item;
211        this.stream.poll_next(cx).map(|opt| match opt {
212            None => PollResult::Terminated,
213            Some(data) => {
214                let (ordering, data) = split_item(data);
215                PollResult::Item { data, ordering }
216            }
217        })
218    }
219
220    fn size_hint(&self) -> (usize, Option<usize>) {
221        self.stream.size_hint()
222    }
223}
224
225impl<S, F, Ordering, Data> FusedOrderedStream for FromStreamDirect<S, F>
226where
227    S: FusedStream,
228    F: FnMut(S::Item) -> (Ordering, Data),
229    Ordering: Ord,
230{
231    fn is_terminated(&self) -> bool {
232        self.stream.is_terminated()
233    }
234}
235
236pin_project_lite::pin_project! {
237    /// An [`OrderedStream`] wrapper around a [`Stream`].
238    ///
239    /// Unlike [`FromStream`], the items in the [`Stream`] are themselves ordered with no
240    /// additional data.
241    #[derive(Debug)]
242    pub struct FromSortedStream<S> {
243        #[pin]
244        pub stream: S,
245    }
246}
247
248impl<S> FromSortedStream<S> {
249    /// Create a new [`OrderedStream`] by applying a `split_item` closure to each element
250    /// produced by the original stream.
251    pub fn new(stream: S) -> Self
252    where
253        S: Stream,
254        S::Item: Ord,
255    {
256        Self { stream }
257    }
258}
259
260impl<S> OrderedStream for FromSortedStream<S>
261where
262    S: Stream,
263    S::Item: Ord,
264{
265    type Data = ();
266    type Ordering = S::Item;
267
268    fn poll_next_before(
269        self: Pin<&mut Self>,
270        cx: &mut Context<'_>,
271        _: Option<&Self::Ordering>,
272    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
273        let this = self.project();
274        this.stream.poll_next(cx).map(|opt| match opt {
275            None => PollResult::Terminated,
276            Some(ordering) => PollResult::Item { data: (), ordering },
277        })
278    }
279
280    fn size_hint(&self) -> (usize, Option<usize>) {
281        self.stream.size_hint()
282    }
283}
284
285impl<S> FusedOrderedStream for FromSortedStream<S>
286where
287    S: FusedStream,
288    S::Item: Ord,
289{
290    fn is_terminated(&self) -> bool {
291        self.stream.is_terminated()
292    }
293}
294
295pin_project_lite::pin_project! {
296    /// An [`OrderedStream`] wrapper around a [`Stream`].
297    ///
298    /// This caches the last-used ordering point returned by the stream and uses it to produce
299    /// NoneBefore results.  This makes it suitable for using to adapt streams that are filtered
300    /// or mapped before joining.  It still relies on the original stream producing a later-ordered
301    /// element to allow other streams to progress, however.
302    #[derive(Debug)]
303    pub struct FromStream<S, F, Ordering> {
304        #[pin]
305        stream: S,
306        split_item: F,
307        last: Option<Ordering>,
308    }
309}
310
311impl<S, F, Ordering> FromStream<S, F, Ordering>
312where
313    S: Stream,
314    Ordering: Ord + Clone,
315{
316    /// Create a new [`OrderedStream`] by applying a `split_item` closure to each element
317    /// produced by the original stream.
318    pub fn new<Data>(stream: S, split_item: F) -> Self
319    where
320        F: FnMut(S::Item) -> (Ordering, Data),
321    {
322        FromStream {
323            stream,
324            split_item,
325            last: None,
326        }
327    }
328
329    /// Helper function to simplify the creation of a stream when you have a get_ordering function.
330    pub fn with_ordering(
331        stream: S,
332        mut get_ordering: F,
333    ) -> FromStream<S, impl FnMut(S::Item) -> (Ordering, S::Item), Ordering>
334    where
335        F: FnMut(&S::Item) -> Ordering,
336    {
337        FromStream::new(stream, move |data| {
338            let ordering = get_ordering(&data);
339            (ordering, data)
340        })
341    }
342}
343
344impl<S, F, Ordering, Data> OrderedStream for FromStream<S, F, Ordering>
345where
346    S: Stream,
347    F: FnMut(S::Item) -> (Ordering, Data),
348    Ordering: Ord + Clone,
349{
350    type Data = Data;
351    type Ordering = Ordering;
352
353    fn poll_next_before(
354        self: Pin<&mut Self>,
355        cx: &mut Context<'_>,
356        before: Option<&Self::Ordering>,
357    ) -> Poll<PollResult<Ordering, Data>> {
358        let this = self.project();
359        let split_item = this.split_item;
360        let last = this.last;
361        if let (Some(last), Some(before)) = (last.as_ref(), before) {
362            if last >= before {
363                return Poll::Ready(PollResult::NoneBefore);
364            }
365        }
366        this.stream.poll_next(cx).map(|opt| match opt {
367            None => PollResult::Terminated,
368            Some(item) => {
369                let (ordering, data) = split_item(item);
370                *last = Some(ordering.clone());
371                PollResult::Item { data, ordering }
372            }
373        })
374    }
375
376    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
377        self.last.as_ref().map(MaybeBorrowed::Borrowed)
378    }
379
380    fn size_hint(&self) -> (usize, Option<usize>) {
381        self.stream.size_hint()
382    }
383}
384
385impl<S, F, Ordering, Data> FusedOrderedStream for FromStream<S, F, Ordering>
386where
387    S: FusedStream,
388    F: FnMut(S::Item) -> (Ordering, Data),
389    Ordering: Ord + Clone,
390{
391    fn is_terminated(&self) -> bool {
392        self.stream.is_terminated()
393    }
394}
395
396pin_project_lite::pin_project! {
397    /// A [`Stream`] for the [`into_stream`](OrderedStreamExt::into_stream) function.
398    #[derive(Debug)]
399    pub struct IntoStream<S> {
400        #[pin]
401        stream: S,
402    }
403}
404
405impl<S: OrderedStream> Stream for IntoStream<S> {
406    type Item = S::Data;
407
408    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
409        self.project()
410            .stream
411            .poll_next_before(cx, None)
412            .map(|r| r.into_data())
413    }
414
415    fn size_hint(&self) -> (usize, Option<usize>) {
416        self.stream.size_hint()
417    }
418}
419
420impl<S> FusedStream for IntoStream<S>
421where
422    S: FusedOrderedStream,
423{
424    fn is_terminated(&self) -> bool {
425        self.stream.is_terminated()
426    }
427}
428
429pin_project_lite::pin_project! {
430    /// A [`Stream`] for the [`into_tuple_stream`](OrderedStreamExt::into_tuple_stream) function.
431    #[derive(Debug)]
432    pub struct IntoTupleStream<S> {
433        #[pin]
434        stream: S,
435    }
436}
437
438impl<S: OrderedStream> Stream for IntoTupleStream<S> {
439    type Item = (S::Ordering, S::Data);
440
441    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
442        self.project()
443            .stream
444            .poll_next_before(cx, None)
445            .map(|r| r.into_tuple())
446    }
447
448    fn size_hint(&self) -> (usize, Option<usize>) {
449        self.stream.size_hint()
450    }
451}
452
453impl<S> FusedStream for IntoTupleStream<S>
454where
455    S: FusedOrderedStream,
456{
457    fn is_terminated(&self) -> bool {
458        self.stream.is_terminated()
459    }
460}
461
462pin_project_lite::pin_project! {
463    /// A [`Stream`] for the [`into_ordering`](OrderedStreamExt::into_ordering) function.
464    #[derive(Debug)]
465    pub struct IntoOrdering<S> {
466        #[pin]
467        stream: S,
468    }
469}
470
471impl<S: OrderedStream> Stream for IntoOrdering<S> {
472    type Item = S::Ordering;
473
474    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
475        self.project()
476            .stream
477            .poll_next_before(cx, None)
478            .map(|r| r.into_tuple().map(|t| t.0))
479    }
480
481    fn size_hint(&self) -> (usize, Option<usize>) {
482        self.stream.size_hint()
483    }
484}
485
486impl<S> FusedStream for IntoOrdering<S>
487where
488    S: FusedOrderedStream,
489{
490    fn is_terminated(&self) -> bool {
491        self.stream.is_terminated()
492    }
493}
494
495pin_project_lite::pin_project! {
496    /// An [`OrderedStream`] wrapper around an [`OrderedFuture`].
497    #[derive(Debug)]
498    pub struct FromFuture<F> {
499        #[pin]
500        future: Option<F>,
501    }
502}
503
504impl<F: OrderedFuture> From<F> for FromFuture<F> {
505    fn from(future: F) -> Self {
506        Self {
507            future: Some(future),
508        }
509    }
510}
511
512impl<F: OrderedFuture> OrderedStream for FromFuture<F> {
513    type Data = F::Output;
514    type Ordering = F::Ordering;
515
516    fn poll_next_before(
517        self: Pin<&mut Self>,
518        cx: &mut Context<'_>,
519        before: Option<&Self::Ordering>,
520    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
521        let mut this = self.project();
522        match this.future.as_mut().as_pin_mut() {
523            Some(future) => match future.poll_before(cx, before) {
524                Poll::Ready(Some((ordering, data))) => {
525                    this.future.set(None);
526                    Poll::Ready(PollResult::Item { data, ordering })
527                }
528                Poll::Ready(None) => Poll::Ready(PollResult::NoneBefore),
529                Poll::Pending => Poll::Pending,
530            },
531            None => Poll::Ready(PollResult::Terminated),
532        }
533    }
534
535    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
536        self.future.as_ref().and_then(|f| f.position_hint())
537    }
538
539    fn size_hint(&self) -> (usize, Option<usize>) {
540        if self.future.is_some() {
541            (1, Some(1))
542        } else {
543            (0, Some(0))
544        }
545    }
546}
547
548impl<F: OrderedFuture> FusedOrderedStream for FromFuture<F> {
549    fn is_terminated(&self) -> bool {
550        self.future.is_none()
551    }
552}
553
554pin_project_lite::pin_project! {
555    /// A stream for the [`map`](OrderedStreamExt::map) function.
556    #[derive(Debug)]
557    pub struct Map<S, F> {
558        #[pin]
559        stream: S,
560        f: F,
561    }
562}
563
564impl<S, F> Map<S, F> {
565    /// Convert to source stream.
566    pub fn into_inner(self) -> S {
567        self.stream
568    }
569}
570
571impl<S, F, R> OrderedStream for Map<S, F>
572where
573    S: OrderedStream,
574    F: FnMut(S::Data) -> R,
575{
576    type Data = R;
577    type Ordering = S::Ordering;
578
579    fn poll_next_before(
580        self: Pin<&mut Self>,
581        cx: &mut Context<'_>,
582        before: Option<&Self::Ordering>,
583    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
584        let this = self.project();
585        let f = this.f;
586        this.stream
587            .poll_next_before(cx, before)
588            .map(|res| res.map_data(f))
589    }
590
591    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
592        self.stream.position_hint()
593    }
594
595    fn size_hint(&self) -> (usize, Option<usize>) {
596        self.stream.size_hint()
597    }
598}
599
600pin_project_lite::pin_project! {
601    /// A stream for the [`map_item`](OrderedStreamExt::map_item) function.
602    #[derive(Debug)]
603    pub struct MapItem<S, F> {
604        #[pin]
605        stream: S,
606        f: F,
607    }
608}
609
610impl<S, F> MapItem<S, F> {
611    /// Convert to source stream.
612    pub fn into_inner(self) -> S {
613        self.stream
614    }
615}
616
617impl<S, F, R> OrderedStream for MapItem<S, F>
618where
619    S: OrderedStream,
620    F: FnMut(&S::Ordering, S::Data) -> R,
621{
622    type Data = R;
623    type Ordering = S::Ordering;
624
625    fn poll_next_before(
626        self: Pin<&mut Self>,
627        cx: &mut Context<'_>,
628        before: Option<&Self::Ordering>,
629    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
630        let this = self.project();
631        let f = this.f;
632        this.stream
633            .poll_next_before(cx, before)
634            .map(|res| match res {
635                PollResult::Item { data, ordering } => {
636                    let data = f(&ordering, data);
637                    PollResult::Item { data, ordering }
638                }
639                PollResult::NoneBefore => PollResult::NoneBefore,
640                PollResult::Terminated => PollResult::Terminated,
641            })
642    }
643
644    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
645        self.stream.position_hint()
646    }
647
648    fn size_hint(&self) -> (usize, Option<usize>) {
649        self.stream.size_hint()
650    }
651}
652
653pin_project_lite::pin_project! {
654    /// A stream for the [`map_ordering`](OrderedStreamExt::map_ordering) function.
655    #[derive(Debug)]
656    pub struct MapOrdering<S, MapInto, MapFrom> {
657        #[pin]
658        stream: S,
659        map_into: MapInto, map_from: MapFrom,
660    }
661}
662
663impl<S, I, F> MapOrdering<S, I, F> {
664    /// Convert to source stream.
665    pub fn into_inner(self) -> S {
666        self.stream
667    }
668}
669
670impl<S, MapInto, MapFrom, NewOrdering, NewData> OrderedStream for MapOrdering<S, MapInto, MapFrom>
671where
672    S: OrderedStream,
673    MapInto: FnMut(S::Ordering, S::Data) -> (NewOrdering, NewData),
674    MapFrom: FnMut(&NewOrdering) -> Option<S::Ordering>,
675    NewOrdering: Ord,
676{
677    type Data = NewData;
678    type Ordering = NewOrdering;
679
680    fn poll_next_before(
681        self: Pin<&mut Self>,
682        cx: &mut Context<'_>,
683        before: Option<&Self::Ordering>,
684    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
685        let this = self.project();
686        let map_into = this.map_into;
687        let before = before.and_then(this.map_from);
688        this.stream
689            .poll_next_before(cx, before.as_ref())
690            .map(|res| match res {
691                PollResult::Item { data, ordering } => {
692                    let (ordering, data) = map_into(ordering, data);
693                    PollResult::Item { data, ordering }
694                }
695                PollResult::NoneBefore => PollResult::NoneBefore,
696                PollResult::Terminated => PollResult::Terminated,
697            })
698    }
699
700    fn size_hint(&self) -> (usize, Option<usize>) {
701        self.stream.size_hint()
702    }
703}
704
705pin_project_lite::pin_project! {
706    /// A stream for the [`filter`](OrderedStreamExt::filter) function.
707    #[derive(Debug)]
708    pub struct Filter<S, F> {
709        #[pin]
710        stream: S,
711        filter: F,
712    }
713}
714
715impl<S, F> Filter<S, F> {
716    /// Convert to source stream.
717    pub fn into_inner(self) -> S {
718        self.stream
719    }
720}
721
722impl<S, F> OrderedStream for Filter<S, F>
723where
724    S: OrderedStream,
725    F: FnMut(&S::Data) -> bool,
726{
727    type Data = S::Data;
728    type Ordering = S::Ordering;
729
730    fn poll_next_before(
731        self: Pin<&mut Self>,
732        cx: &mut Context<'_>,
733        before: Option<&Self::Ordering>,
734    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
735        let mut this = self.project();
736        loop {
737            match this.stream.as_mut().poll_next_before(cx, before).into() {
738                PollState::Pending => return Poll::Pending,
739                PollState::Terminated => return Poll::Ready(PollResult::Terminated),
740                PollState::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
741                PollState::Item(data, ordering) => {
742                    if (this.filter)(&data) {
743                        return Poll::Ready(PollResult::Item { data, ordering });
744                    }
745                }
746            }
747        }
748    }
749
750    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
751        self.stream.position_hint()
752    }
753
754    fn size_hint(&self) -> (usize, Option<usize>) {
755        (0, self.stream.size_hint().1)
756    }
757}
758
759pin_project_lite::pin_project! {
760    /// A stream for the [`filter_map`](OrderedStreamExt::filter_map) function.
761    #[derive(Debug)]
762    pub struct FilterMap<S, F> {
763        #[pin]
764        stream: S,
765        filter: F,
766    }
767}
768
769impl<S, F> FilterMap<S, F> {
770    /// Convert to source stream.
771    pub fn into_inner(self) -> S {
772        self.stream
773    }
774}
775
776impl<S, F, R> OrderedStream for FilterMap<S, F>
777where
778    S: OrderedStream,
779    F: FnMut(S::Data) -> Option<R>,
780{
781    type Data = R;
782    type Ordering = S::Ordering;
783
784    fn poll_next_before(
785        self: Pin<&mut Self>,
786        cx: &mut Context<'_>,
787        before: Option<&Self::Ordering>,
788    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
789        let mut this = self.project();
790        loop {
791            match this.stream.as_mut().poll_next_before(cx, before).into() {
792                PollState::Pending => return Poll::Pending,
793                PollState::Terminated => return Poll::Ready(PollResult::Terminated),
794                PollState::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
795                PollState::Item(data, ordering) => match (this.filter)(data) {
796                    Some(data) => return Poll::Ready(PollResult::Item { data, ordering }),
797                    None => continue,
798                },
799            }
800        }
801    }
802
803    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
804        self.stream.position_hint()
805    }
806
807    fn size_hint(&self) -> (usize, Option<usize>) {
808        (0, self.stream.size_hint().1)
809    }
810}
811
812pin_project_lite::pin_project! {
813    #[project = ThenProj]
814    #[project_replace = ThenDone]
815    #[derive(Debug)]
816    enum ThenItem<Fut, T> {
817        Running { #[pin] future: Fut, ordering: T },
818        Idle,
819    }
820}
821
822pin_project_lite::pin_project! {
823    /// A stream for the [`then`](OrderedStreamExt::then) function.
824    #[derive(Debug)]
825    pub struct Then<S, F, Fut>
826        where S: OrderedStream
827    {
828        #[pin]
829        stream: S,
830        then: F,
831        #[pin]
832        future: ThenItem<Fut, S::Ordering>,
833    }
834}
835
836impl<S, F, Fut> OrderedStream for Then<S, F, Fut>
837where
838    S: OrderedStream,
839    F: FnMut(S::Data) -> Fut,
840    Fut: Future,
841{
842    type Data = Fut::Output;
843    type Ordering = S::Ordering;
844
845    fn poll_next_before(
846        self: Pin<&mut Self>,
847        cx: &mut Context<'_>,
848        before: Option<&Self::Ordering>,
849    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
850        let mut this = self.project();
851        loop {
852            if let ThenProj::Running { future, ordering } = this.future.as_mut().project() {
853                // Because we know the next ordering, we can answer questions about it now.
854                if let Some(before) = before {
855                    if *ordering >= *before {
856                        return Poll::Ready(PollResult::NoneBefore);
857                    }
858                }
859
860                if let Poll::Ready(data) = future.poll(cx) {
861                    if let ThenDone::Running { ordering, .. } =
862                        this.future.as_mut().project_replace(ThenItem::Idle)
863                    {
864                        return Poll::Ready(PollResult::Item { data, ordering });
865                    }
866                } else {
867                    return Poll::Pending;
868                }
869            }
870            match this.stream.as_mut().poll_next_before(cx, before).into() {
871                PollState::Pending => return Poll::Pending,
872                PollState::Terminated => return Poll::Ready(PollResult::Terminated),
873                PollState::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
874                PollState::Item(data, ordering) => {
875                    this.future.set(ThenItem::Running {
876                        future: (this.then)(data),
877                        ordering,
878                    });
879                }
880            }
881        }
882    }
883
884    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
885        match &self.future {
886            ThenItem::Running { ordering, .. } => Some(MaybeBorrowed::Borrowed(ordering)),
887            ThenItem::Idle => self.stream.position_hint(),
888        }
889    }
890
891    fn size_hint(&self) -> (usize, Option<usize>) {
892        let (min, max) = self.stream.size_hint();
893        match self.future {
894            ThenItem::Running { .. } => (min.saturating_add(1), max.and_then(|v| v.checked_add(1))),
895            ThenItem::Idle => (min, max),
896        }
897    }
898}
899
900/// A future for the [`next`](OrderedStreamExt::next) function.
901#[derive(Debug)]
902pub struct Next<'a, S: ?Sized> {
903    stream: Pin<&'a mut S>,
904}
905
906impl<'a, S: ?Sized> Unpin for Next<'a, S> {}
907
908impl<'a, S> Future for Next<'a, S>
909where
910    S: OrderedStream + ?Sized,
911{
912    type Output = Option<S::Data>;
913
914    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Data>> {
915        self.stream
916            .as_mut()
917            .poll_next_before(cx, None)
918            .map(PollResult::into_data)
919    }
920}
921
922/// A future for the [`next_before`](OrderedStreamExt::next_before) function.
923#[derive(Debug)]
924pub struct NextBefore<'a, S>
925where
926    S: OrderedStream + ?Sized,
927{
928    stream: Pin<&'a mut S>,
929    before: Option<&'a S::Ordering>,
930}
931
932impl<'a, S: OrderedStream + ?Sized> Unpin for NextBefore<'a, S> {}
933
934impl<'a, S> Future for NextBefore<'a, S>
935where
936    S: OrderedStream + ?Sized,
937{
938    type Output = PollResult<S::Ordering, S::Data>;
939
940    fn poll(
941        mut self: Pin<&mut Self>,
942        cx: &mut Context<'_>,
943    ) -> Poll<PollResult<S::Ordering, S::Data>> {
944        let before = self.before;
945        self.stream.as_mut().poll_next_before(cx, before)
946    }
947}
948
949pin_project_lite::pin_project! {
950    /// A stream for the [`peekable`](OrderedStreamExt::peekable) function.
951    #[derive(Debug)]
952    pub struct Peekable<S: OrderedStream> {
953        #[pin]
954        stream: S,
955        is_terminated: bool,
956        item: Option<(S::Ordering, S::Data)>,
957    }
958}
959
960impl<S: OrderedStream> Peekable<S> {
961    /// Convert into the source stream.
962    ///
963    /// This method returns the source stream along with any buffered item and its
964    /// ordering.
965    pub fn into_inner(self) -> (S, Option<(S::Data, S::Ordering)>) {
966        (self.stream, self.item.map(|(o, d)| (d, o)))
967    }
968
969    /// The current item, without polling
970    pub(crate) fn item(&self) -> Option<&(S::Ordering, S::Data)> {
971        self.item.as_ref()
972    }
973
974    /// Peek on the next item in the stream
975    pub fn poll_peek_before(
976        self: Pin<&mut Self>,
977        cx: &mut Context<'_>,
978        before: Option<&S::Ordering>,
979    ) -> Poll<PollResult<&S::Ordering, &mut S::Data>> {
980        let mut this = self.project();
981        if *this.is_terminated {
982            return Poll::Ready(PollResult::Terminated);
983        }
984        let stream = this.stream.as_mut();
985        if this.item.is_none() {
986            match stream.poll_next_before(cx, before) {
987                Poll::Ready(PollResult::Item { ordering, data }) => {
988                    *this.item = Some((ordering, data));
989                }
990                Poll::Ready(PollResult::NoneBefore) => return Poll::Ready(PollResult::NoneBefore),
991                Poll::Ready(PollResult::Terminated) => {
992                    *this.is_terminated = true;
993                    return Poll::Ready(PollResult::Terminated);
994                }
995                Poll::Pending => return Poll::Pending,
996            }
997        }
998        let item = this.item.as_mut().unwrap();
999        Poll::Ready(PollResult::Item {
1000            ordering: &item.0,
1001            data: &mut item.1,
1002        })
1003    }
1004}
1005
1006impl<S: OrderedStream> OrderedStream for Peekable<S> {
1007    type Ordering = S::Ordering;
1008    type Data = S::Data;
1009
1010    fn poll_next_before(
1011        mut self: Pin<&mut Self>,
1012        cx: &mut Context<'_>,
1013        before: Option<&S::Ordering>,
1014    ) -> Poll<PollResult<S::Ordering, S::Data>> {
1015        match self.as_mut().poll_peek_before(cx, before) {
1016            Poll::Ready(PollResult::Item { .. }) => {
1017                let (ordering, data) = self.project().item.take().unwrap();
1018                Poll::Ready(PollResult::Item { ordering, data })
1019            }
1020            Poll::Ready(PollResult::NoneBefore) => Poll::Ready(PollResult::NoneBefore),
1021            Poll::Ready(PollResult::Terminated) => Poll::Ready(PollResult::Terminated),
1022            Poll::Pending => Poll::Pending,
1023        }
1024    }
1025
1026    fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
1027        match &self.item {
1028            Some((ordering, _)) => Some(MaybeBorrowed::Borrowed(ordering)),
1029            None => self.stream.position_hint(),
1030        }
1031    }
1032
1033    fn size_hint(&self) -> (usize, Option<usize>) {
1034        let (min, max) = if self.is_terminated {
1035            (0, Some(0))
1036        } else {
1037            self.stream.size_hint()
1038        };
1039        if self.item.is_some() {
1040            (min.saturating_add(1), max.and_then(|v| v.checked_add(1)))
1041        } else {
1042            (min, max)
1043        }
1044    }
1045}
1046
1047impl<S: OrderedStream> FusedOrderedStream for Peekable<S> {
1048    fn is_terminated(&self) -> bool {
1049        self.is_terminated
1050    }
1051}