datafusion_physical_plan/
work_table.rs1use std::any::Any;
21use std::sync::{Arc, Mutex};
22
23use crate::execution_plan::{Boundedness, EmissionType};
24use crate::memory::MemoryStream;
25use crate::{
26 metrics::{ExecutionPlanMetricsSet, MetricsSet},
27 SendableRecordBatchStream, Statistics,
28};
29use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
30
31use arrow::datatypes::SchemaRef;
32use arrow::record_batch::RecordBatch;
33use datafusion_common::{internal_datafusion_err, internal_err, Result};
34use datafusion_execution::memory_pool::MemoryReservation;
35use datafusion_execution::TaskContext;
36use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
37
38#[derive(Debug)]
40pub(super) struct ReservedBatches {
41 batches: Vec<RecordBatch>,
42 #[allow(dead_code)]
43 reservation: MemoryReservation,
44}
45
46impl ReservedBatches {
47 pub(super) fn new(batches: Vec<RecordBatch>, reservation: MemoryReservation) -> Self {
48 ReservedBatches {
49 batches,
50 reservation,
51 }
52 }
53}
54
55#[derive(Debug)]
59pub(super) struct WorkTable {
60 batches: Mutex<Option<ReservedBatches>>,
61}
62
63impl WorkTable {
64 pub(super) fn new() -> Self {
66 Self {
67 batches: Mutex::new(None),
68 }
69 }
70
71 fn take(&self) -> Result<ReservedBatches> {
74 self.batches
75 .lock()
76 .unwrap()
77 .take()
78 .ok_or_else(|| internal_datafusion_err!("Unexpected empty work table"))
79 }
80
81 pub(super) fn update(&self, batches: ReservedBatches) {
83 self.batches.lock().unwrap().replace(batches);
84 }
85}
86
87#[derive(Clone, Debug)]
98pub struct WorkTableExec {
99 name: String,
101 schema: SchemaRef,
103 work_table: Arc<WorkTable>,
105 metrics: ExecutionPlanMetricsSet,
107 cache: PlanProperties,
109}
110
111impl WorkTableExec {
112 pub fn new(name: String, schema: SchemaRef) -> Self {
114 let cache = Self::compute_properties(Arc::clone(&schema));
115 Self {
116 name,
117 schema,
118 metrics: ExecutionPlanMetricsSet::new(),
119 work_table: Arc::new(WorkTable::new()),
120 cache,
121 }
122 }
123
124 pub fn name(&self) -> &str {
126 &self.name
127 }
128
129 pub fn schema(&self) -> SchemaRef {
131 Arc::clone(&self.schema)
132 }
133
134 pub(super) fn with_work_table(&self, work_table: Arc<WorkTable>) -> Self {
135 Self {
136 name: self.name.clone(),
137 schema: Arc::clone(&self.schema),
138 metrics: ExecutionPlanMetricsSet::new(),
139 work_table,
140 cache: self.cache.clone(),
141 }
142 }
143
144 fn compute_properties(schema: SchemaRef) -> PlanProperties {
146 PlanProperties::new(
147 EquivalenceProperties::new(schema),
148 Partitioning::UnknownPartitioning(1),
149 EmissionType::Incremental,
150 Boundedness::Bounded,
151 )
152 }
153}
154
155impl DisplayAs for WorkTableExec {
156 fn fmt_as(
157 &self,
158 t: DisplayFormatType,
159 f: &mut std::fmt::Formatter,
160 ) -> std::fmt::Result {
161 match t {
162 DisplayFormatType::Default | DisplayFormatType::Verbose => {
163 write!(f, "WorkTableExec: name={}", self.name)
164 }
165 }
166 }
167}
168
169impl ExecutionPlan for WorkTableExec {
170 fn name(&self) -> &'static str {
171 "WorkTableExec"
172 }
173
174 fn as_any(&self) -> &dyn Any {
175 self
176 }
177
178 fn properties(&self) -> &PlanProperties {
179 &self.cache
180 }
181
182 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
183 vec![]
184 }
185
186 fn maintains_input_order(&self) -> Vec<bool> {
187 vec![false]
188 }
189
190 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
191 vec![false]
192 }
193
194 fn with_new_children(
195 self: Arc<Self>,
196 _: Vec<Arc<dyn ExecutionPlan>>,
197 ) -> Result<Arc<dyn ExecutionPlan>> {
198 Ok(Arc::clone(&self) as Arc<dyn ExecutionPlan>)
199 }
200
201 fn execute(
203 &self,
204 partition: usize,
205 _context: Arc<TaskContext>,
206 ) -> Result<SendableRecordBatchStream> {
207 if partition != 0 {
209 return internal_err!(
210 "WorkTableExec got an invalid partition {partition} (expected 0)"
211 );
212 }
213 let batch = self.work_table.take()?;
214 Ok(Box::pin(
215 MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), None)?
216 .with_reservation(batch.reservation),
217 ))
218 }
219
220 fn metrics(&self) -> Option<MetricsSet> {
221 Some(self.metrics.clone_inner())
222 }
223
224 fn statistics(&self) -> Result<Statistics> {
225 Ok(Statistics::new_unknown(&self.schema()))
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use arrow::array::{ArrayRef, Int32Array};
233 use datafusion_execution::memory_pool::{MemoryConsumer, UnboundedMemoryPool};
234
235 #[test]
236 fn test_work_table() {
237 let work_table = WorkTable::new();
238 assert!(work_table.take().is_err());
240
241 let pool = Arc::new(UnboundedMemoryPool::default()) as _;
242 let mut reservation = MemoryConsumer::new("test_work_table").register(&pool);
243
244 let array: ArrayRef = Arc::new((0..5).collect::<Int32Array>());
246 let batch = RecordBatch::try_from_iter(vec![("col", array)]).unwrap();
247 reservation.try_grow(100).unwrap();
248 work_table.update(ReservedBatches::new(vec![batch.clone()], reservation));
249 let reserved_batches = work_table.take().unwrap();
251 assert_eq!(reserved_batches.batches, vec![batch.clone()]);
252
253 let memory_stream =
255 MemoryStream::try_new(reserved_batches.batches, batch.schema(), None)
256 .unwrap()
257 .with_reservation(reserved_batches.reservation);
258
259 assert_eq!(pool.reserved(), 100);
261
262 drop(memory_stream);
264 assert_eq!(pool.reserved(), 0);
265 }
266}