polars_parquet/parquet/write/
dyn_iter.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use crate::parquet::FallibleStreamingIterator;

/// [`DynIter`] is an implementation of a single-threaded, dynamically-typed iterator.
///
/// This implementation is object safe.
pub struct DynIter<'a, V> {
    iter: Box<dyn Iterator<Item = V> + 'a + Send + Sync>,
}

impl<V> Iterator for DynIter<'_, V> {
    type Item = V;
    fn next(&mut self) -> Option<Self::Item> {
        self.iter.next()
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        self.iter.size_hint()
    }
}

impl<'a, V> DynIter<'a, V> {
    /// Returns a new [`DynIter`], boxing the incoming iterator
    pub fn new<I>(iter: I) -> Self
    where
        I: Iterator<Item = V> + 'a + Send + Sync,
    {
        Self {
            iter: Box::new(iter),
        }
    }
}

/// Dynamically-typed [`FallibleStreamingIterator`].
pub struct DynStreamingIterator<'a, V, E> {
    iter: Box<dyn FallibleStreamingIterator<Item = V, Error = E> + 'a + Send + Sync>,
}

impl<V, E> FallibleStreamingIterator for DynStreamingIterator<'_, V, E> {
    type Item = V;
    type Error = E;

    fn advance(&mut self) -> Result<(), Self::Error> {
        self.iter.advance()
    }

    fn get(&self) -> Option<&Self::Item> {
        self.iter.get()
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        self.iter.size_hint()
    }
}

impl<'a, V, E> DynStreamingIterator<'a, V, E> {
    /// Returns a new [`DynStreamingIterator`], boxing the incoming iterator
    pub fn new<I>(iter: I) -> Self
    where
        I: FallibleStreamingIterator<Item = V, Error = E> + 'a + Send + Sync,
    {
        Self {
            iter: Box::new(iter),
        }
    }
}