arrow_csv

Module reader

Source
Expand description

CSV Reader

§Basic Usage

This CSV reader allows CSV files to be read into the Arrow memory model. Records are loaded in batches and are then converted from row-based data to columnar data.

Example:


let schema = Schema::new(vec![
    Field::new("city", DataType::Utf8, false),
    Field::new("lat", DataType::Float64, false),
    Field::new("lng", DataType::Float64, false),
]);

let file = File::open("test/data/uk_cities.csv").unwrap();

let mut csv = ReaderBuilder::new(Arc::new(schema)).build(file).unwrap();
let batch = csv.next().unwrap().unwrap();

§Async Usage

The lower-level Decoder can be integrated with various forms of async data streams, and is designed to be agnostic to the various different kinds of async IO primitives found within the Rust ecosystem.

For example, see below for how it can be used with an arbitrary Stream of Bytes

fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
    mut decoder: Decoder,
    mut input: S,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
    let mut buffered = Bytes::new();
    futures::stream::poll_fn(move |cx| {
        loop {
            if buffered.is_empty() {
                if let Some(b) = ready!(input.poll_next_unpin(cx)) {
                    buffered = b;
                }
                // Note: don't break on `None` as the decoder needs
                // to be called with an empty array to delimit the
                // final record
            }
            let decoded = match decoder.decode(buffered.as_ref()) {
                Ok(0) => break,
                Ok(decoded) => decoded,
                Err(e) => return Poll::Ready(Some(Err(e))),
            };
            buffered.advance(decoded);
        }

        Poll::Ready(decoder.flush().transpose())
    })
}

In a similar vein, it can also be used with tokio-based IO primitives

fn decode_stream<R: AsyncBufRead + Unpin>(
    mut decoder: Decoder,
    mut reader: R,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
    futures::stream::poll_fn(move |cx| {
        loop {
            let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
                Ok(b) => b,
                Err(e) => return Poll::Ready(Some(Err(e.into()))),
            };
            let decoded = match decoder.decode(b) {
                // Note: the decoder needs to be called with an empty
                // array to delimit the final record
                Ok(0) => break,
                Ok(decoded) => decoded,
                Err(e) => return Poll::Ready(Some(Err(e))),
            };
            Pin::new(&mut reader).consume(decoded);
        }

        Poll::Ready(decoder.flush().transpose())
    })
}

Structs§

  • CSV file reader
  • A push-based interface for decoding CSV data from an arbitrary byte stream
  • The format specification for the CSV file
  • CSV file reader builder

Functions§

  • Infer the schema of a CSV file by reading through the first n records of the file, with max_read_records controlling the maximum number of records to read.
  • Infer schema of CSV records provided by struct that implements Read trait.
  • Infer schema from a list of CSV files by reading through first n records with max_read_records controlling the maximum number of records to read.

Type Aliases§