Crate csv_async

Source
Expand description

The csv-async crate provides a fast and flexible CSV reader and writer, which is intended to be run in asynchronous environment - i.e. inside functions with async attribute called by tasks run by executor. This library does not imply using any particular executor. Unit tests and documentation snippets use either async-std or tokio crates. Synchronous interface for reading and writing CSV files is not contained in this crate, please use csv crate for this. This crate attempts to mimic csv crate API, but there are some exceptions. E.g. configuration builders have create_... factory functions instead of from_... as in csv crate.

§Brief overview

The primary types in this crate are AsyncReader and AsyncWriter for reading and writing CSV data respectively. Or AsyncDeserializer and AsyncSerializer for reading and writing CSV data using interfaces generated by serde_derive macros.

Correspondingly, to support CSV data with custom field or record delimiters (among many other things), you should use either a AsyncReaderBuilder or a AsyncWriterBuilder, depending on whether you’re reading or writing CSV data.

The standard CSV record types are StringRecord and ByteRecord. StringRecord should be used when you know your data to be valid UTF-8. For data that may be invalid UTF-8, ByteRecord is suitable.

Finally, the set of errors is described by the Error type.

The rest of the types in this crate mostly correspond to more detailed errors, position information, configuration knobs or iterator types.

§Setup

In root folder for your project run cargo add csv-async or cargo add --features tokio csv-async to add this crate to your projext.

§Examples

This example shows how to read and write CSV file in asynchronous context and get into some record details.

Sample input file:

city,region,country,population
Southborough,MA,United States,9686
Northbridge,MA,United States,14061
Marlborough,MA,United States,38334
Springfield,MA,United States,152227
Springfield,MO,United States,150443
Springfield,NJ,United States,14976
Concord,NH,United States,42605
use std::error::Error;
use std::process;
#[cfg(not(feature = "tokio"))]
use futures::stream::StreamExt;
#[cfg(not(feature = "tokio"))]
use async_std::fs::File;
#[cfg(feature = "tokio")]
use tokio1 as tokio;
#[cfg(feature = "tokio")]
use tokio_stream::StreamExt;
#[cfg(feature = "tokio")]
use tokio::fs::File;

async fn filter_by_region(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
    // Function reads CSV file that has column named "region" at second position (index = 1).
    // It writes to new file only rows with region equal to passed argument
    // and removes region column.
    let mut rdr = csv_async::AsyncReader::from_reader(
        File::open(file_in).await?
    );
    let mut wri = csv_async::AsyncWriter::from_writer(
        File::create(file_out).await?
    );
    wri.write_record(rdr
        .headers()
        .await?.into_iter()
        .filter(|h| *h != "region")
    ).await?;
    let mut records = rdr.records();
    while let Some(record) = records.next().await {
        let record = record?;
        match record.get(1) {
            Some(reg) if reg == region => 
                wri.write_record(record
                    .iter()
                    .enumerate()
                    .filter(|(i, _)| *i != 1)
                    .map(|(_, s)| s)
                ).await?,
            _ => {},
        }
    }
    Ok(())
}

#[cfg(not(feature = "tokio"))]
fn main() {
    async_std::task::block_on(async {
        if let Err(err) = filter_by_region(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region: {}", err);
            process::exit(1);
        }
    });
}

#[cfg(feature = "tokio")]
fn main() {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        if let Err(err) = filter_by_region(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region: {}", err);
            process::exit(1);
        }
    });
}
use std::error::Error;
use std::process;
#[cfg(feature = "with_serde")]
use serde::{Deserialize, Serialize};
#[cfg(not(feature = "tokio"))]
use futures::stream::StreamExt;
#[cfg(not(feature = "tokio"))]
use async_std::fs::File;
#[cfg(feature = "tokio")]
use tokio1 as tokio;
#[cfg(feature = "tokio")]
use tokio_stream::StreamExt;
#[cfg(feature = "tokio")]
use tokio::fs::File;

#[cfg(feature = "with_serde")]
#[derive(Deserialize, Serialize)]
struct Row {
    city: String,
    region: String,
    country: String,
    population: u64,
}

#[cfg(feature = "with_serde")]
async fn filter_by_region_serde(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
    // Function reads CSV file that has column named "region" at second position (index = 1).
    // It writes to new file only rows with region equal to passed argument.
    let mut rdr = csv_async::AsyncDeserializer::from_reader(
        File::open(file_in).await?
    );
    let mut wri = csv_async::AsyncSerializer::from_writer(
        File::create(file_out).await?
    );
    let mut records = rdr.deserialize::<Row>();
    while let Some(record) = records.next().await {
        let record = record?;
        if record.region == region {
            wri.serialize(&record).await?;
        }
    }
    Ok(())
}

#[cfg(feature = "with_serde")]
#[cfg(not(feature = "tokio"))]
fn main() {
    async_std::task::block_on(async {
        if let Err(err) = filter_by_region_serde(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region_serde: {}", err);
            process::exit(1);
        }
    });
}

#[cfg(feature = "with_serde")]
#[cfg(feature = "tokio")]
fn main() {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        if let Err(err) = filter_by_region_serde(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region_serde: {}", err);
            process::exit(1);
        }
    });
}

#[cfg(not(feature = "with_serde"))]
fn main() {}

Structs§

AsyncDeserializer
Configured CSV serde deserializer.
AsyncReader
A already configured CSV reader.
AsyncReaderBuilder
Builds a CSV reader with various configuration knobs.
AsyncSerializer
An already configured CSV serde serializer.
AsyncWriter
An already configured CSV writer.
AsyncWriterBuilder
Builds a CSV writer with various configuration knobs.
ByteRecord
A single CSV record stored as raw bytes.
ByteRecordIter
A double-ended iterator over the fields in a byte record.
ByteRecordsIntoStream
An owned stream of records as raw bytes.
ByteRecordsStream
A borrowed stream of records as raw bytes.
DeserializeError
An Serde deserialization error.
DeserializeRecordsIntoStream
A owned stream of deserialized records.
DeserializeRecordsIntoStreamPos
A owned stream of pairs: deserialized records and position in stream before reading record.
DeserializeRecordsStream
A borrowed stream of deserialized records.
DeserializeRecordsStreamPos
A borrowed stream of pairs: deserialized records and position in stream before reading record.
Error
An error that can occur when processing CSV data.
FromUtf8Error
A UTF-8 validation error during record conversion.
IntoInnerError
IntoInnerError occurs when consuming a Writer fails.
Position
A position in CSV data.
StringRecord
A single CSV record stored as valid UTF-8 bytes.
StringRecordIter
An iterator over the fields in a string record.
StringRecordsIntoStream
An owned stream of records as strings.
StringRecordsStream
A borrowed stream of records as strings.
Utf8Error
A UTF-8 validation error.

Enums§

DeserializeErrorKind
The type of a Serde deserialization error.
ErrorKind
The specific type of an error.
QuoteStyle
The quoting style to use when writing CSV data.
Terminator
A record terminator.
Trim
The whitespace preservation behavior when reading CSV data.

Type Aliases§

Result
A type alias for Result<T, csv_async::Error>.