polars_python/lazyframe/
exitable.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
use polars::prelude::*;
use pyo3::prelude::*;

use super::PyLazyFrame;
use crate::error::PyPolarsErr;
use crate::PyDataFrame;

#[pymethods]
#[cfg(not(target_arch = "wasm32"))]
impl PyLazyFrame {
    fn collect_concurrently(&self, py: Python) -> PyResult<PyInProcessQuery> {
        let ipq = py.allow_threads(|| {
            let ldf = self.ldf.clone();
            ldf.collect_concurrently().map_err(PyPolarsErr::from)
        })?;
        Ok(PyInProcessQuery { ipq })
    }
}

#[pyclass]
#[cfg(not(target_arch = "wasm32"))]
#[repr(transparent)]
#[derive(Clone)]
pub struct PyInProcessQuery {
    pub ipq: InProcessQuery,
}

#[pymethods]
#[cfg(not(target_arch = "wasm32"))]
impl PyInProcessQuery {
    pub fn cancel(&self, py: Python) {
        py.allow_threads(|| self.ipq.cancel())
    }

    pub fn fetch(&self, py: Python) -> PyResult<Option<PyDataFrame>> {
        let out = py.allow_threads(|| self.ipq.fetch().transpose().map_err(PyPolarsErr::from))?;
        Ok(out.map(|df| df.into()))
    }

    pub fn fetch_blocking(&self, py: Python) -> PyResult<PyDataFrame> {
        let out = py.allow_threads(|| self.ipq.fetch_blocking().map_err(PyPolarsErr::from))?;
        Ok(out.into())
    }
}