use std::io::SeekFrom;
use ahash::AHashMap;
use arrow_format::ipc::planus::ReadAsRoot;
use arrow_format::ipc::{Block, MessageHeaderRef};
use futures::stream::BoxStream;
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt};
use polars_error::{polars_bail, polars_err, PolarsResult};
use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch};
use super::file::{deserialize_footer, get_record_batch};
use super::{Dictionaries, FileMetadata, OutOfSpecKind};
use crate::array::*;
use crate::datatypes::{ArrowSchema, Field};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC_V2, CONTINUATION_MARKER};
use crate::record_batch::RecordBatchT;
pub struct FileStream<'a> {
stream: BoxStream<'a, PolarsResult<RecordBatchT<Box<dyn Array>>>>,
schema: Option<ArrowSchema>,
metadata: FileMetadata,
}
impl<'a> FileStream<'a> {
pub fn new<R>(
reader: R,
metadata: FileMetadata,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Self
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
let (projection, schema) = if let Some(projection) = projection {
let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection);
let schema = ArrowSchema {
fields,
metadata: metadata.schema.metadata.clone(),
};
(Some((p, h)), Some(schema))
} else {
(None, None)
};
let stream = Self::stream(reader, None, metadata.clone(), projection, limit);
Self {
stream,
metadata,
schema,
}
}
pub fn metadata(&self) -> &FileMetadata {
&self.metadata
}
pub fn schema(&self) -> &ArrowSchema {
self.schema.as_ref().unwrap_or(&self.metadata.schema)
}
fn stream<R>(
mut reader: R,
mut dictionaries: Option<Dictionaries>,
metadata: FileMetadata,
projection: Option<(Vec<usize>, AHashMap<usize, usize>)>,
limit: Option<usize>,
) -> BoxStream<'a, PolarsResult<RecordBatchT<Box<dyn Array>>>>
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
async_stream::try_stream! {
cached_read_dictionaries(&mut reader, &metadata, &mut dictionaries).await?;
let mut meta_buffer = Default::default();
let mut block_buffer = Default::default();
let mut scratch = Default::default();
let mut remaining = limit.unwrap_or(usize::MAX);
for block in 0..metadata.blocks.len() {
let chunk = read_batch(
&mut reader,
dictionaries.as_mut().unwrap(),
&metadata,
projection.as_ref().map(|x| x.0.as_ref()),
Some(remaining),
block,
&mut meta_buffer,
&mut block_buffer,
&mut scratch
).await?;
remaining -= chunk.len();
let chunk = if let Some((_, map)) = &projection {
apply_projection(chunk, map)
} else {
chunk
};
yield chunk;
}
}
.boxed()
}
}
impl<'a> Stream for FileStream<'a> {
type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.get_mut().stream.poll_next_unpin(cx)
}
}
async fn read_footer_len<R: AsyncRead + AsyncSeek + Unpin>(reader: &mut R) -> PolarsResult<usize> {
reader.seek(SeekFrom::End(-10)).await?;
let mut footer: [u8; 10] = [0; 10];
reader.read_exact(&mut footer).await?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());
if footer[4..] != ARROW_MAGIC_V2 {
polars_bail!(oos = OutOfSpecKind::InvalidFooter)
}
footer_len
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))
}
pub async fn read_file_metadata_async<R>(reader: &mut R) -> PolarsResult<FileMetadata>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let footer_size = read_footer_len(reader).await?;
reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?;
let mut footer = vec![];
footer.try_reserve(footer_size)?;
reader
.take(footer_size as u64)
.read_to_end(&mut footer)
.await?;
deserialize_footer(&footer, u64::MAX)
}
#[allow(clippy::too_many_arguments)]
async fn read_batch<R>(
mut reader: R,
dictionaries: &mut Dictionaries,
metadata: &FileMetadata,
projection: Option<&[usize]>,
limit: Option<usize>,
block: usize,
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
scratch: &mut Vec<u8>,
) -> PolarsResult<RecordBatchT<Box<dyn Array>>>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let block = metadata.blocks[block];
let offset: u64 = block
.offset
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
reader.seek(SeekFrom::Start(offset)).await?;
let mut meta_buf = [0; 4];
reader.read_exact(&mut meta_buf).await?;
if meta_buf == CONTINUATION_MARKER {
reader.read_exact(&mut meta_buf).await?;
}
let meta_len = i32::from_le_bytes(meta_buf)
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
meta_buffer.clear();
meta_buffer.try_reserve(meta_len)?;
(&mut reader)
.take(meta_len as u64)
.read_to_end(meta_buffer)
.await?;
let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer)
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
let batch = get_record_batch(message)?;
let block_length: usize = message
.body_length()
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBodyLength(err)))?
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
block_buffer.clear();
block_buffer.try_reserve(block_length)?;
reader
.take(block_length as u64)
.read_to_end(block_buffer)
.await?;
let mut cursor = std::io::Cursor::new(&block_buffer);
read_record_batch(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
projection,
limit,
dictionaries,
message
.version()
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferVersion(err)))?,
&mut cursor,
0,
metadata.size,
scratch,
)
}
async fn read_dictionaries<R>(
mut reader: R,
fields: &[Field],
ipc_schema: &IpcSchema,
blocks: &[Block],
scratch: &mut Vec<u8>,
) -> PolarsResult<Dictionaries>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut dictionaries = Default::default();
let mut data: Vec<u8> = vec![];
let mut buffer: Vec<u8> = vec![];
for block in blocks {
let offset: u64 = block
.offset
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
let length: usize = block
.body_length
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
read_dictionary_message(&mut reader, offset, &mut data).await?;
let message = arrow_format::ipc::MessageRef::read_as_root(data.as_ref())
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?;
let header = message
.header()
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))?
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?;
match header {
MessageHeaderRef::DictionaryBatch(batch) => {
buffer.clear();
buffer.try_reserve(length)?;
(&mut reader)
.take(length as u64)
.read_to_end(&mut buffer)
.await?;
let mut cursor = std::io::Cursor::new(&buffer);
read_dictionary(
batch,
fields,
ipc_schema,
&mut dictionaries,
&mut cursor,
0,
u64::MAX,
scratch,
)?;
},
_ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType),
}
}
Ok(dictionaries)
}
async fn read_dictionary_message<R>(
mut reader: R,
offset: u64,
data: &mut Vec<u8>,
) -> PolarsResult<()>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut message_size = [0; 4];
reader.seek(SeekFrom::Start(offset)).await?;
reader.read_exact(&mut message_size).await?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size).await?;
}
let footer_size = i32::from_le_bytes(message_size);
let footer_size: usize = footer_size
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
data.clear();
data.try_reserve(footer_size)?;
(&mut reader)
.take(footer_size as u64)
.read_to_end(data)
.await?;
Ok(())
}
async fn cached_read_dictionaries<R: AsyncRead + AsyncSeek + Unpin>(
reader: &mut R,
metadata: &FileMetadata,
dictionaries: &mut Option<Dictionaries>,
) -> PolarsResult<()> {
match (&dictionaries, metadata.dictionaries.as_deref()) {
(None, Some(blocks)) => {
let new_dictionaries = read_dictionaries(
reader,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
&mut Default::default(),
)
.await?;
*dictionaries = Some(new_dictionaries);
},
(None, None) => {
*dictionaries = Some(Default::default());
},
_ => {},
};
Ok(())
}