1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use crate::logical_plan::FETCH_ROWS;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_core::prelude::*;
use polars_core::POOL;
pub struct JoinExec {
input_left: Option<Box<dyn Executor>>,
input_right: Option<Box<dyn Executor>>,
how: JoinType,
left_on: Vec<Arc<dyn PhysicalExpr>>,
right_on: Vec<Arc<dyn PhysicalExpr>>,
parallel: bool,
suffix: Option<String>,
asof_by_left: Vec<String>,
asof_by_right: Vec<String>,
}
impl JoinExec {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
input_left: Box<dyn Executor>,
input_right: Box<dyn Executor>,
how: JoinType,
left_on: Vec<Arc<dyn PhysicalExpr>>,
right_on: Vec<Arc<dyn PhysicalExpr>>,
parallel: bool,
suffix: Option<String>,
asof_by_left: Vec<String>,
asof_by_right: Vec<String>,
) -> Self {
JoinExec {
input_left: Some(input_left),
input_right: Some(input_right),
how,
left_on,
right_on,
parallel,
suffix,
asof_by_left,
asof_by_right,
}
}
}
impl Executor for JoinExec {
fn execute<'a>(&'a mut self, state: &'a ExecutionState) -> Result<DataFrame> {
let mut input_left = self.input_left.take().unwrap();
let mut input_right = self.input_right.take().unwrap();
let (df_left, df_right) = if self.parallel {
let state_left = state.clone();
let state_right = state.clone();
let fetch_rows = FETCH_ROWS.with(|fetch_rows| fetch_rows.get());
POOL.join(
move || {
FETCH_ROWS.with(|fr| fr.set(fetch_rows));
input_left.execute(&state_left)
},
move || {
FETCH_ROWS.with(|fr| fr.set(fetch_rows));
input_right.execute(&state_right)
},
)
} else {
(input_left.execute(state), input_right.execute(state))
};
let df_left = df_left?;
let df_right = df_right?;
let left_names = self
.left_on
.iter()
.map(|e| e.evaluate(&df_left, state).map(|s| s.name().to_string()))
.collect::<Result<Vec<_>>>()?;
let right_names = self
.right_on
.iter()
.map(|e| e.evaluate(&df_right, state).map(|s| s.name().to_string()))
.collect::<Result<Vec<_>>>()?;
#[cfg(feature = "asof_join")]
let df = if let (JoinType::AsOf, true, true) = (
self.how,
!self.asof_by_right.is_empty(),
!self.asof_by_left.is_empty(),
) {
if left_names.len() > 1 || right_names.len() > 1 {
return Err(PolarsError::ValueError(
"only one column allowed in asof join".into(),
));
}
df_left.join_asof_by(
&df_right,
&left_names[0],
&right_names[0],
&self.asof_by_left,
&self.asof_by_right,
)
} else {
df_left.join(
&df_right,
&left_names,
&right_names,
self.how,
self.suffix.clone(),
)
};
#[cfg(not(feature = "asof_join"))]
let df = df_left.join(
&df_right,
&left_names,
&right_names,
self.how,
self.suffix.clone(),
);
if state.verbose {
eprintln!("{:?} join dataframes finished", self.how);
};
df
}
}