gix_features/parallel/
in_order.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::{cmp::Ordering, collections::BTreeMap};

/// A counter for items that are in sequence, to be able to put them back into original order later.
pub type SequenceId = usize;

/// An iterator which olds iterated items with a **sequential** ID starting at 0 long enough to dispense them in order.
pub struct InOrderIter<T, I> {
    /// The iterator yielding the out-of-order elements we are to yield in order.
    pub inner: I,
    store: BTreeMap<SequenceId, T>,
    next_chunk: SequenceId,
    is_done: bool,
}

impl<T, E, I> From<I> for InOrderIter<T, I>
where
    I: Iterator<Item = Result<(SequenceId, T), E>>,
{
    fn from(iter: I) -> Self {
        InOrderIter {
            inner: iter,
            store: Default::default(),
            next_chunk: 0,
            is_done: false,
        }
    }
}

impl<T, E, I> Iterator for InOrderIter<T, I>
where
    I: Iterator<Item = Result<(SequenceId, T), E>>,
{
    type Item = Result<T, E>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.is_done {
            return None;
        }
        'find_next_in_sequence: loop {
            match self.inner.next() {
                Some(Ok((c, v))) => match c.cmp(&self.next_chunk) {
                    Ordering::Equal => {
                        self.next_chunk += 1;
                        return Some(Ok(v));
                    }
                    Ordering::Less => {
                        unreachable!("in a correctly ordered sequence we can never see keys again, got {}", c)
                    }
                    Ordering::Greater => {
                        let previous = self.store.insert(c, v);
                        assert!(
                            previous.is_none(),
                            "Chunks are returned only once, input is an invalid sequence"
                        );
                        if let Some(v) = self.store.remove(&self.next_chunk) {
                            self.next_chunk += 1;
                            return Some(Ok(v));
                        }
                        continue 'find_next_in_sequence;
                    }
                },
                Some(Err(e)) => {
                    self.is_done = true;
                    self.store.clear();
                    return Some(Err(e));
                }
                None => match self.store.remove(&self.next_chunk) {
                    Some(v) => {
                        self.next_chunk += 1;
                        return Some(Ok(v));
                    }
                    None => {
                        debug_assert!(
                            self.store.is_empty(),
                            "When iteration is done we should not have stored items left"
                        );
                        return None;
                    }
                },
            }
        }
    }
}