Module arrow_json::reader
source · Expand description
JSON reader
This JSON reader allows JSON records to be read into the Arrow memory model. Records are loaded in batches and are then converted from the record-oriented representation to the columnar arrow data model.
The reader ignores whitespace between JSON values, including \n
and \r
, allowing
parsing of sequences of one or more arbitrarily formatted JSON values, including
but not limited to newline-delimited JSON.
§Basic Usage
Reader
can be used directly with synchronous data sources, such as std::fs::File
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float64, false),
Field::new("b", DataType::Float64, false),
Field::new("c", DataType::Boolean, true),
]));
let file = File::open("test/data/basic.json").unwrap();
let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
let batch = json.next().unwrap().unwrap();
§Async Usage
The lower-level Decoder
can be integrated with various forms of async data streams,
and is designed to be agnostic to the various different kinds of async IO primitives found
within the Rust ecosystem.
For example, see below for how it can be used with an arbitrary Stream
of Bytes
fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
mut decoder: Decoder,
mut input: S,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
let mut buffered = Bytes::new();
futures::stream::poll_fn(move |cx| {
loop {
if buffered.is_empty() {
buffered = match ready!(input.poll_next_unpin(cx)) {
Some(b) => b,
None => break,
};
}
let decoded = match decoder.decode(buffered.as_ref()) {
Ok(decoded) => decoded,
Err(e) => return Poll::Ready(Some(Err(e))),
};
let read = buffered.len();
buffered.advance(decoded);
if decoded != read {
break
}
}
Poll::Ready(decoder.flush().transpose())
})
}
In a similar vein, it can also be used with tokio-based IO primitives
fn decode_stream<R: AsyncBufRead + Unpin>(
mut decoder: Decoder,
mut reader: R,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
futures::stream::poll_fn(move |cx| {
loop {
let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
Ok(b) if b.is_empty() => break,
Ok(b) => b,
Err(e) => return Poll::Ready(Some(Err(e.into()))),
};
let read = b.len();
let decoded = match decoder.decode(b) {
Ok(decoded) => decoded,
Err(e) => return Poll::Ready(Some(Err(e))),
};
Pin::new(&mut reader).consume(decoded);
if decoded != read {
break;
}
}
Poll::Ready(decoder.flush().transpose())
})
}
Structs§
- A low-level interface for reading JSON data from a byte stream
- Reads JSON data with a known schema directly into arrow
RecordBatch
- JSON file reader that produces a serde_json::Value iterator from a Read trait
Functions§
- Infer the fields of a JSON file by reading the first n records of the buffer, with
max_read_records
controlling the maximum number of records to read. - Infer the fields of a JSON file by reading all items from the JSON Value Iterator.
- Infer the fields of a JSON file by reading the first n records of the file, with
max_read_records
controlling the maximum number of records to read.