use std::ffi::{CStr, CString};
use std::ops::DerefMut;
use polars_error::{polars_bail, polars_err, PolarsError, PolarsResult};
use super::{
export_array_to_c, export_field_to_c, import_array_from_c, import_field_from_c, ArrowArray,
ArrowArrayStream, ArrowSchema,
};
use crate::array::Array;
use crate::datatypes::Field;
impl Drop for ArrowArrayStream {
fn drop(&mut self) {
match self.release {
None => (),
Some(release) => unsafe { release(self) },
};
}
}
impl ArrowArrayStream {
pub fn empty() -> Self {
Self {
get_schema: None,
get_next: None,
get_last_error: None,
release: None,
private_data: std::ptr::null_mut(),
}
}
}
unsafe fn handle_error(iter: &mut ArrowArrayStream) -> PolarsError {
let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) };
if error.is_null() {
return polars_err!(ComputeError: "got unspecified external error");
}
let error = unsafe { CStr::from_ptr(error) };
polars_err!(ComputeError: "got external error: {}", error.to_str().unwrap())
}
pub struct ArrowArrayStreamReader<Iter: DerefMut<Target = ArrowArrayStream>> {
iter: Iter,
field: Field,
}
impl<Iter: DerefMut<Target = ArrowArrayStream>> ArrowArrayStreamReader<Iter> {
pub unsafe fn try_new(mut iter: Iter) -> PolarsResult<Self> {
if iter.release.is_none() {
polars_bail!(InvalidOperation: "the C stream was already released")
};
if iter.get_next.is_none() {
polars_bail!(InvalidOperation: "the C stream must contain a non-null get_next")
};
if iter.get_last_error.is_none() {
polars_bail!(InvalidOperation: "The C stream MUST contain a non-null get_last_error")
};
let mut field = ArrowSchema::empty();
let status = if let Some(f) = iter.get_schema {
unsafe { (f)(&mut *iter, &mut field) }
} else {
polars_bail!(InvalidOperation:
"The C stream MUST contain a non-null get_schema"
)
};
if status != 0 {
return Err(unsafe { handle_error(&mut iter) });
}
let field = unsafe { import_field_from_c(&field)? };
Ok(Self { iter, field })
}
pub fn field(&self) -> &Field {
&self.field
}
pub unsafe fn next(&mut self) -> Option<PolarsResult<Box<dyn Array>>> {
let mut array = ArrowArray::empty();
let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut array) };
if status != 0 {
return Some(Err(unsafe { handle_error(&mut self.iter) }));
}
array.release?;
unsafe { import_array_from_c(array, self.field.data_type.clone()) }
.map(Some)
.transpose()
}
}
struct PrivateData {
iter: Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>>>,
field: Field,
error: Option<CString>,
}
unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 {
if iter.is_null() {
return 2001;
}
let private = &mut *((*iter).private_data as *mut PrivateData);
match private.iter.next() {
Some(Ok(item)) => {
let item_dt = item.data_type();
let expected_dt = private.field.data_type();
if item_dt != expected_dt {
private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap());
return 2001; }
std::ptr::write(array, export_array_to_c(item));
private.error = None;
0
},
Some(Err(err)) => {
private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap());
2001 },
None => {
let a = ArrowArray::empty();
std::ptr::write_unaligned(array, a);
private.error = None;
0
},
}
}
unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 {
if iter.is_null() {
return 2001;
}
let private = &mut *((*iter).private_data as *mut PrivateData);
std::ptr::write(schema, export_field_to_c(&private.field));
0
}
unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char {
if iter.is_null() {
return std::ptr::null();
}
let private = &mut *((*iter).private_data as *mut PrivateData);
private
.error
.as_ref()
.map(|x| x.as_ptr())
.unwrap_or(std::ptr::null())
}
unsafe extern "C" fn release(iter: *mut ArrowArrayStream) {
if iter.is_null() {
return;
}
let _ = Box::from_raw((*iter).private_data as *mut PrivateData);
(*iter).release = None;
}
pub fn export_iterator(
iter: Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>>>,
field: Field,
) -> ArrowArrayStream {
let private_data = Box::new(PrivateData {
iter,
field,
error: None,
});
ArrowArrayStream {
get_schema: Some(get_schema),
get_next: Some(get_next),
get_last_error: Some(get_last_error),
release: Some(release),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}