polars_json/ndjson/
deserialize.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use arrow::array::Array;
use arrow::legacy::kernels::concatenate::concatenate_owned_unchecked;
use simd_json::BorrowedValue;

use super::*;

/// Deserializes an iterator of rows into an [`Array`][Array] of [`DataType`].
///
/// [Array]: arrow::array::Array
///
/// # Implementation
/// This function is CPU-bounded.
/// This function is guaranteed to return an array of length equal to the length
/// # Errors
/// This function errors iff any of the rows is not a valid JSON (i.e. the format is not valid NDJSON).
pub fn deserialize_iter<'a>(
    rows: impl Iterator<Item = &'a str>,
    dtype: ArrowDataType,
    buf_size: usize,
    count: usize,
    allow_extra_fields_in_struct: bool,
) -> PolarsResult<ArrayRef> {
    let mut arr: Vec<Box<dyn Array>> = Vec::new();
    let mut buf = Vec::with_capacity(std::cmp::min(buf_size + count + 2, u32::MAX as usize));
    buf.push(b'[');

    fn _deserializer(
        s: &mut [u8],
        dtype: ArrowDataType,
        allow_extra_fields_in_struct: bool,
    ) -> PolarsResult<Box<dyn Array>> {
        let out = simd_json::to_borrowed_value(s)
            .map_err(|e| PolarsError::ComputeError(format!("json parsing error: '{e}'").into()))?;
        if let BorrowedValue::Array(rows) = out {
            super::super::json::deserialize::_deserialize(
                &rows,
                dtype.clone(),
                allow_extra_fields_in_struct,
            )
        } else {
            unreachable!()
        }
    }
    let mut row_iter = rows.peekable();

    while let Some(row) = row_iter.next() {
        buf.extend_from_slice(row.as_bytes());
        buf.push(b',');

        let next_row_length = row_iter.peek().map(|row| row.len()).unwrap_or(0);
        if buf.len() + next_row_length >= u32::MAX as usize {
            let _ = buf.pop();
            buf.push(b']');
            arr.push(_deserializer(
                &mut buf,
                dtype.clone(),
                allow_extra_fields_in_struct,
            )?);
            buf.clear();
            buf.push(b'[');
        }
    }
    if buf.len() > 1 {
        let _ = buf.pop();
    }
    buf.push(b']');

    if arr.is_empty() {
        _deserializer(&mut buf, dtype.clone(), allow_extra_fields_in_struct)
    } else {
        arr.push(_deserializer(
            &mut buf,
            dtype.clone(),
            allow_extra_fields_in_struct,
        )?);
        concatenate_owned_unchecked(&arr)
    }
}