polars_ffi/
version_0.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
use polars_core::prelude::{Column, CompatLevel};

use super::*;

/// An FFI exported `Series`.
#[repr(C)]
pub struct SeriesExport {
    field: *mut ArrowSchema,
    // A double ptr, so we can easily release the buffer
    // without dropping the arrays.
    arrays: *mut *mut ArrowArray,
    len: usize,
    release: Option<unsafe extern "C" fn(arg1: *mut SeriesExport)>,
    private_data: *mut std::os::raw::c_void,
}

impl SeriesExport {
    pub fn empty() -> Self {
        Self {
            field: std::ptr::null_mut(),
            arrays: std::ptr::null_mut(),
            len: 0,
            release: None,
            private_data: std::ptr::null_mut(),
        }
    }

    pub fn is_null(&self) -> bool {
        self.private_data.is_null()
    }
}

impl Drop for SeriesExport {
    fn drop(&mut self) {
        if let Some(release) = self.release {
            unsafe { release(self) }
        }
    }
}

// callback used to drop [SeriesExport] when it is exported.
unsafe extern "C" fn c_release_series_export(e: *mut SeriesExport) {
    if e.is_null() {
        return;
    }
    let e = &mut *e;
    let private = Box::from_raw(e.private_data as *mut PrivateData);
    for ptr in private.arrays.iter() {
        // drop the box, not the array
        let _ = Box::from_raw(*ptr as *mut ManuallyDrop<ArrowArray>);
    }

    e.release = None;
}

pub fn export_column(c: &Column) -> SeriesExport {
    export_series(c.as_materialized_series())
}

pub fn export_series(s: &Series) -> SeriesExport {
    let field = ArrowField::new(
        s.name().clone(),
        s.dtype().to_arrow(CompatLevel::newest()),
        true,
    );
    let schema = Box::new(ffi::export_field_to_c(&field));

    let mut arrays = (0..s.chunks().len())
        .map(|i| {
            // Make sure we export the logical type.
            let arr = s.to_arrow(i, CompatLevel::newest());
            Box::into_raw(Box::new(ffi::export_array_to_c(arr.clone())))
        })
        .collect::<Box<_>>();

    let len = arrays.len();
    let ptr = arrays.as_mut_ptr();
    SeriesExport {
        field: schema.as_ref() as *const ArrowSchema as *mut ArrowSchema,
        arrays: ptr,
        len,
        release: Some(c_release_series_export),
        private_data: Box::into_raw(Box::new(PrivateData { arrays, schema }))
            as *mut std::os::raw::c_void,
    }
}

/// # Safety
/// `SeriesExport` must be valid
pub unsafe fn import_series(e: SeriesExport) -> PolarsResult<Series> {
    let field = ffi::import_field_from_c(&(*e.field))?;

    let pointers = std::slice::from_raw_parts_mut(e.arrays, e.len);
    let chunks = pointers
        .iter()
        .map(|ptr| {
            let arr = std::ptr::read(*ptr);
            import_array(arr, &(*e.field))
        })
        .collect::<PolarsResult<Vec<_>>>()?;

    Series::try_from((field.name.clone(), chunks))
}

/// # Safety
/// `SeriesExport` must be valid
pub unsafe fn import_series_buffer(e: *mut SeriesExport, len: usize) -> PolarsResult<Vec<Series>> {
    let mut out = Vec::with_capacity(len);
    for i in 0..len {
        let e = std::ptr::read(e.add(i));
        out.push(import_series(e)?)
    }
    Ok(out)
}

/// Passed to an expression.
/// This contains information for the implementer of the expression on what it is allowed to do.
#[derive(Copy, Clone, Debug, Default)]
#[repr(C)]
pub struct CallerContext {
    // bit
    // 1: PARALLEL
    bitflags: u64,
}

impl CallerContext {
    const fn kth_bit_set(&self, k: u64) -> bool {
        (self.bitflags & (1 << k)) > 0
    }

    fn set_kth_bit(&mut self, k: u64) {
        self.bitflags |= 1 << k
    }

    /// Parallelism is done by polars' main engine, the plugin should not run its own parallelism.
    /// If this is `false`, the plugin could use parallelism without (much) contention with polars
    /// parallelism strategies.
    pub fn parallel(&self) -> bool {
        self.kth_bit_set(0)
    }

    pub fn _set_parallel(&mut self) {
        self.set_kth_bit(0)
    }
}

#[cfg(test)]
mod test {
    use polars_core::prelude::*;

    use super::*;

    #[test]
    fn test_ffi() {
        let s = Series::new("a".into(), [1, 2]);
        let e = export_series(&s);

        unsafe {
            assert_eq!(import_series(e).unwrap(), s);
        };
    }
}