Struct csv_async::AsyncReader

source ·
pub struct AsyncReader<R>(/* private fields */);
Expand description

A already configured CSV reader.

A CSV reader takes as input CSV data and transforms that into standard Rust values. The reader reads CSV data is as a sequence of records, where a record is a sequence of fields and each field is a string.

§Configuration

A CSV reader has convenient constructor method from_reader. However, if you want to configure the CSV reader to use a different delimiter or quote character (among many other things), then you should use a AsyncReaderBuilder to construct a AsyncReader. For example, to change the field delimiter:

use std::error::Error;
use futures::stream::StreamExt;
use csv_async::AsyncReaderBuilder;

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city;country;pop
Boston;United States;4628910
";
    let mut rdr = AsyncReaderBuilder::new()
        .delimiter(b';')
        .create_reader(data.as_bytes());

    let mut records = rdr.records();
    assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
    Ok(())
}

§Error handling

In general, CSV parsing does not ever return an error. That is, there is no such thing as malformed CSV data. Instead, this reader will prioritize finding a parse over rejecting CSV data that it does not understand. This choice was inspired by other popular CSV parsers, but also because it is pragmatic. CSV data varies wildly, so even if the CSV data is malformed, it might still be possible to work with the data. In the land of CSV, there is no “right” or “wrong,” only “right” and “less right.”

With that said, a number of errors can occur while reading CSV data:

  • By default, all records in CSV data must have the same number of fields. If a record is found with a different number of fields than a prior record, then an error is returned. This behavior can be disabled by enabling flexible parsing via the flexible method on AsyncReaderBuilder.
  • When reading CSV data from a resource (like a file), it is possible for reading from the underlying resource to fail. This will return an error. For subsequent calls to the reader after encountering a such error (unless seek is used), it will behave as if end of file had been reached, in order to avoid running into infinite loops when still attempting to read the next record when one has errored.
  • When reading CSV data into String or &str fields (e.g., via a StringRecord), UTF-8 is strictly enforced. If CSV data is invalid UTF-8, then an error is returned. If you want to read invalid UTF-8, then you should use the byte oriented APIs such as ByteRecord. If you need explicit support for another encoding entirely, then you’ll need to use another crate to transcode your CSV data to UTF-8 before parsing it.
  • When using Serde to deserialize CSV data into Rust types, it is possible for a number of additional errors to occur. For example, deserializing a field xyz into an i32 field will result in an error.

For more details on the precise semantics of errors, see the Error type.

Implementations§

source§

impl<'r, R> AsyncReader<R>
where R: AsyncRead + Unpin + Send + 'r,

source

pub fn from_reader(rdr: R) -> AsyncReader<R>

Create a new CSV parser with a default configuration for the given reader.

To customize CSV parsing, use a ReaderBuilder.

§Example
use std::error::Error;
use futures::stream::StreamExt;
use csv_async::AsyncReader;

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
Concord,United States,42695
";
    let mut rdr = AsyncReader::from_reader(data.as_bytes());
    let mut records = rdr.into_records();
    while let Some(record) = records.next().await {
        println!("{:?}", record?);
    }
    Ok(())
}
source

pub fn records(&mut self) -> StringRecordsStream<'_, R>

Returns a borrowed iterator over all records as strings.

Each item yielded by this iterator is a Result<StringRecord, Error>. Therefore, in order to access the record, callers must handle the possibility of error (typically with try! or ?).

If has_headers was enabled via a ReaderBuilder (which is the default), then this does not include the first record.

§Example
use std::error::Error;
use futures::stream::StreamExt;
use csv_async::AsyncReader;

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
";
    let mut rdr = AsyncReader::from_reader(data.as_bytes());
    let mut records = rdr.records();
    while let Some(record) = records.next().await {
        println!("{:?}", record?);
    }
    Ok(())
}
source

pub fn into_records(self) -> StringRecordsIntoStream<'r, R>

Returns an owned iterator over all records as strings.

Each item yielded by this iterator is a Result<StringRecord, Error>. Therefore, in order to access the record, callers must handle the possibility of error (typically with try! or ?).

This is mostly useful when you want to return a CSV iterator or store it somewhere.

If has_headers was enabled via a ReaderBuilder (which is the default), then this does not include the first record.

§Example
use std::error::Error;
use futures::stream::StreamExt;
use csv_async::AsyncReader;

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
";
    let rdr = AsyncReader::from_reader(data.as_bytes());
    let mut records = rdr.into_records();
    while let Some(record) = records.next().await {
        println!("{:?}", record?);
    }
    Ok(())
}
source

pub fn byte_records(&mut self) -> ByteRecordsStream<'_, R>

Returns a borrowed iterator over all records as raw bytes.

Each item yielded by this iterator is a Result<ByteRecord, Error>. Therefore, in order to access the record, callers must handle the possibility of error (typically with try! or ?).

If has_headers was enabled via a ReaderBuilder (which is the default), then this does not include the first record.

§Example
use std::error::Error;
use futures::stream::StreamExt;
use csv_async::AsyncReader;

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
";
    let mut rdr = AsyncReader::from_reader(data.as_bytes());
    let mut iter = rdr.byte_records();
    assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
    assert!(iter.next().await.is_none());
    Ok(())
}
source

pub fn into_byte_records(self) -> ByteRecordsIntoStream<'r, R>

Returns an owned iterator over all records as raw bytes.

Each item yielded by this iterator is a Result<ByteRecord, Error>. Therefore, in order to access the record, callers must handle the possibility of error (typically with try! or ?).

This is mostly useful when you want to return a CSV iterator or store it somewhere.

If has_headers was enabled via a ReaderBuilder (which is the default), then this does not include the first record.

§Example
use std::error::Error;
use futures::stream::StreamExt;
use csv_async::AsyncReader;

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
";
    let rdr = AsyncReader::from_reader(data.as_bytes());
    let mut iter = rdr.into_byte_records();
    assert_eq!(iter.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
    assert!(iter.next().await.is_none());
    Ok(())
}
source

pub async fn headers(&mut self) -> Result<&StringRecord>

Returns a reference to the first row read by this parser.

If no row has been read yet, then this will force parsing of the first row.

If there was a problem parsing the row or if it wasn’t valid UTF-8, then this returns an error.

If the underlying reader emits EOF before any data, then this returns an empty record.

Note that this method may be used regardless of whether has_headers was enabled (but it is enabled by default).

§Example

This example shows how to get the header row of CSV data. Notice that the header row does not appear as a record in the iterator!

use std::error::Error;
use futures::stream::StreamExt;
use csv_async::AsyncReader;

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
";
    let mut rdr = AsyncReader::from_reader(data.as_bytes());

    // We can read the headers before iterating.
    {
    // `headers` borrows from the reader, so we put this in its
    // own scope. That way, the borrow ends before we try iterating
    // below. Alternatively, we could clone the headers.
    let headers = rdr.headers().await?;
    assert_eq!(headers, vec!["city", "country", "pop"]);
    }

    {
    let mut records = rdr.records();
    assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
    assert!(records.next().await.is_none());
    }

    // We can also read the headers after iterating.
    let headers = rdr.headers().await?;
    assert_eq!(headers, vec!["city", "country", "pop"]);
    Ok(())
}
source

pub async fn byte_headers(&mut self) -> Result<&ByteRecord>

Returns a reference to the first row read by this parser as raw bytes.

If no row has been read yet, then this will force parsing of the first row.

If there was a problem parsing the row then this returns an error.

If the underlying reader emits EOF before any data, then this returns an empty record.

Note that this method may be used regardless of whether has_headers was enabled (but it is enabled by default).

§Example

This example shows how to get the header row of CSV data. Notice that the header row does not appear as a record in the iterator!

use std::error::Error;
use futures::stream::StreamExt;
use csv_async::AsyncReader;

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
";
    let mut rdr = AsyncReader::from_reader(data.as_bytes());

    // We can read the headers before iterating.
    {
    // `headers` borrows from the reader, so we put this in its
    // own scope. That way, the borrow ends before we try iterating
    // below. Alternatively, we could clone the headers.
    let headers = rdr.byte_headers().await?;
    assert_eq!(headers, vec!["city", "country", "pop"]);
    }

    {
    let mut records = rdr.byte_records();
    assert_eq!(records.next().await.unwrap()?, vec!["Boston", "United States", "4628910"]);
    assert!(records.next().await.is_none());
    }

    // We can also read the headers after iterating.
    let headers = rdr.byte_headers().await?;
    assert_eq!(headers, vec!["city", "country", "pop"]);
    Ok(())
}
source

pub fn set_headers(&mut self, headers: StringRecord)

Set the headers of this CSV parser manually.

This overrides any other setting (including set_byte_headers). Any automatic detection of headers is disabled. This may be called at any time.

§Example
use std::error::Error;
use csv_async::{AsyncReader, StringRecord};

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
";
    let mut rdr = AsyncReader::from_reader(data.as_bytes());

    assert_eq!(rdr.headers().await?, vec!["city", "country", "pop"]);
    rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));
    assert_eq!(rdr.headers().await?, vec!["a", "b", "c"]);

    Ok(())
}
source

pub fn set_byte_headers(&mut self, headers: ByteRecord)

Set the headers of this CSV parser manually as raw bytes.

This overrides any other setting (including set_headers). Any automatic detection of headers is disabled. This may be called at any time.

§Example
use std::error::Error;
use csv_async::{AsyncReader, ByteRecord};

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
";
    let mut rdr = AsyncReader::from_reader(data.as_bytes());

    assert_eq!(rdr.byte_headers().await?, vec!["city", "country", "pop"]);
    rdr.set_byte_headers(ByteRecord::from(vec!["a", "b", "c"]));
    assert_eq!(rdr.byte_headers().await?, vec!["a", "b", "c"]);

    Ok(())
}
source

pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool>

Read a single row into the given record. Returns false when no more records could be read.

If has_headers was enabled via a ReaderBuilder (which is the default), then this will treat initial row as headers and read the first data record.

This method is useful when you want to read records as fast as as possible. It’s less ergonomic than an iterator, but it permits the caller to reuse the StringRecord allocation, which usually results in higher throughput.

Records read via this method are guaranteed to have a position set on them, even if the reader is at EOF or if an error is returned.

§Example
use std::error::Error;
use csv_async::{AsyncReader, StringRecord};

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
";
    let mut rdr = AsyncReader::from_reader(data.as_bytes());
    let mut record = StringRecord::new();

    if rdr.read_record(&mut record).await? {
        assert_eq!(record, vec!["Boston", "United States", "4628910"]);
        Ok(())
    } else {
        Err(From::from("expected at least one record but got none"))
    }
}
source

pub async fn read_byte_record( &mut self, record: &mut ByteRecord ) -> Result<bool>

Read a single row into the given byte record. Returns false when no more records could be read.

If has_headers was enabled via a ReaderBuilder (which is the default), then this will treat initial row as headers and read the first data record.

This method is useful when you want to read records as fast as as possible. It’s less ergonomic than an iterator, but it permits the caller to reuse the ByteRecord allocation, which usually results in higher throughput.

Records read via this method are guaranteed to have a position set on them, even if the reader is at EOF or if an error is returned.

§Example
use std::error::Error;
use csv_async::{ByteRecord, AsyncReader};

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,pop
Boston,United States,4628910
";
    let mut rdr = AsyncReader::from_reader(data.as_bytes());
    let mut record = ByteRecord::new();

    if rdr.read_byte_record(&mut record).await? {
        assert_eq!(record, vec!["Boston", "United States", "4628910"]);
        Ok(())
    } else {
        Err(From::from("expected at least one record but got none"))
    }
}
source

pub fn position(&self) -> &Position

Return the current position of this CSV reader.

The byte offset in the position returned can be used to seek this reader. In particular, seeking to a position returned here on the same data will result in parsing the same subsequent record.

§Example: reading the position
use std::error::Error;
use futures::io;
use futures::stream::StreamExt;
use csv_async::{AsyncReader, Position};

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,popcount
Boston,United States,4628910
Concord,United States,42695
";
    let rdr = AsyncReader::from_reader(io::Cursor::new(data));
    let mut iter = rdr.into_records();
    let mut pos = Position::new();
    loop {
        let next = iter.next().await;
        if let Some(next) = next {
            pos = next?.position().expect("Cursor should be at some valid position").clone();
        } else {
            break;
        }
    }

    // `pos` should now be the position immediately before the last
    // record.
    assert_eq!(pos.byte(), 51);
    assert_eq!(pos.line(), 3);
    assert_eq!(pos.record(), 2);
    Ok(())
}
source

pub fn is_done(&self) -> bool

Returns true if and only if this reader has been exhausted.

When this returns true, no more records can be read from this reader (unless it has been seeked to another position).

§Example
use std::error::Error;
use futures::io;
use futures::stream::StreamExt;
use csv_async::{AsyncReader, Position};

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,popcount
Boston,United States,4628910
Concord,United States,42695
";
    let mut rdr = AsyncReader::from_reader(io::Cursor::new(data));
    assert!(!rdr.is_done());
    {
        let mut records = rdr.records();
        while let Some(record) = records.next().await {
            let _ = record?;
        }
    }
    assert!(rdr.is_done());
    Ok(())
}
source

pub fn has_headers(&self) -> bool

Returns true if and only if this reader has been configured to interpret the first record as a header record.

source

pub fn get_ref(&self) -> &R

Returns a reference to the underlying reader.

source

pub fn get_mut(&mut self) -> &mut R

Returns a mutable reference to the underlying reader.

source

pub fn into_inner(self) -> R

Unwraps this CSV reader, returning the underlying reader.

Note that any leftover data inside this reader’s internal buffer is lost.

source§

impl<R: AsyncRead + AsyncSeek + Unpin> AsyncReader<R>

source

pub async fn seek(&mut self, pos: Position) -> Result<()>

Seeks the underlying reader to the position given.

This comes with a few caveats:

  • Any internal buffer associated with this reader is cleared.
  • If the given position does not correspond to a position immediately before the start of a record, then the behavior of this reader is unspecified.
  • Any special logic that skips the first record in the CSV reader when reading or iterating over records is disabled.

If the given position has a byte offset equivalent to the current position, then no seeking is performed.

If the header row has not already been read, then this will attempt to read the header row before seeking. Therefore, it is possible that this returns an error associated with reading CSV data.

Note that seeking is performed based only on the byte offset in the given position. Namely, the record or line numbers in the position may be incorrect, but this will cause any future position generated by this CSV reader to be similarly incorrect.

§Example: seek to parse a record twice
use std::error::Error;
use futures::io;
use futures::stream::StreamExt;
use csv_async::{AsyncReader, Position};

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,popcount
Boston,United States,4628910
Concord,United States,42695
";
    let mut rdr = AsyncReader::from_reader(io::Cursor::new(data));
    let mut pos = Position::new();
    {
    let mut records = rdr.records();
    loop {
        let next = records.next().await;
        if let Some(next) = next {
            pos = next?.position().expect("Cursor should be at some valid position").clone();
        } else {
            break;
        }
    }
    }

    {
    // Now seek the reader back to `pos`. This will let us read the
    // last record again.
    rdr.seek(pos).await?;
    let mut records = rdr.into_records();
    if let Some(result) = records.next().await {
        let record = result?;
        assert_eq!(record, vec!["Concord", "United States", "42695"]);
        Ok(())
    } else {
        Err(From::from("expected at least one record but got none"))
    }
    }
}
source

pub async fn seek_raw( &mut self, seek_from: SeekFrom, pos: Position ) -> Result<()>

This is like seek, but provides direct control over how the seeking operation is performed via io::SeekFrom.

The pos position given should correspond the position indicated by seek_from, but there is no requirement. If the pos position given is incorrect, then the position information returned by this reader will be similarly incorrect.

If the header row has not already been read, then this will attempt to read the header row before seeking. Therefore, it is possible that this returns an error associated with reading CSV data.

Unlike seek, this will always cause an actual seek to be performed.

source

pub async fn rewind(&mut self) -> Result<()>

Rewinds the underlying reader to first data record.

Function is aware of header presence. After rewind record iterators will return first data record (skipping header if present), while after seek(0) they will return header row (even if has_header is set).

§Example: Reads the same data multiply times
use std::error::Error;
use futures::io;
use futures::stream::StreamExt;
use csv_async::AsyncReader;

async fn example() -> Result<(), Box<dyn Error>> {
    let data = "\
city,country,popcount
Boston,United States,4628910
Concord,United States,42695
";
    let mut rdr = AsyncReader::from_reader(io::Cursor::new(data));
    let mut output = Vec::new();
    loop {
        let mut records = rdr.records();
        while let Some(rec) = records.next().await {
            output.push(rec?);
        }
        if output.len() >= 6 {
            break;
        } else {
            drop(records);
            rdr.rewind().await?;
        }
    }
    assert_eq!(output,
        vec![
            vec!["Boston", "United States", "4628910"],
            vec!["Concord", "United States", "42695"],
            vec!["Boston", "United States", "4628910"],
            vec!["Concord", "United States", "42695"],
            vec!["Boston", "United States", "4628910"],
            vec!["Concord", "United States", "42695"],
        ]);
    Ok(())
}

Trait Implementations§

source§

impl<R: Debug> Debug for AsyncReader<R>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<R> RefUnwindSafe for AsyncReader<R>
where R: RefUnwindSafe,

§

impl<R> Send for AsyncReader<R>
where R: Send,

§

impl<R> Sync for AsyncReader<R>
where R: Sync,

§

impl<R> Unpin for AsyncReader<R>
where R: Unpin,

§

impl<R> UnwindSafe for AsyncReader<R>
where R: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.