datafusion_physical_plan/
work_table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the work table query plan
19
20use std::any::Any;
21use std::sync::{Arc, Mutex};
22
23use crate::execution_plan::{Boundedness, EmissionType};
24use crate::memory::MemoryStream;
25use crate::{
26    metrics::{ExecutionPlanMetricsSet, MetricsSet},
27    SendableRecordBatchStream, Statistics,
28};
29use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
30
31use arrow::datatypes::SchemaRef;
32use arrow::record_batch::RecordBatch;
33use datafusion_common::{internal_datafusion_err, internal_err, Result};
34use datafusion_execution::memory_pool::MemoryReservation;
35use datafusion_execution::TaskContext;
36use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
37
38/// A vector of record batches with a memory reservation.
39#[derive(Debug)]
40pub(super) struct ReservedBatches {
41    batches: Vec<RecordBatch>,
42    #[allow(dead_code)]
43    reservation: MemoryReservation,
44}
45
46impl ReservedBatches {
47    pub(super) fn new(batches: Vec<RecordBatch>, reservation: MemoryReservation) -> Self {
48        ReservedBatches {
49            batches,
50            reservation,
51        }
52    }
53}
54
55/// The name is from PostgreSQL's terminology.
56/// See <https://wiki.postgresql.org/wiki/CTEReadme#How_Recursion_Works>
57/// This table serves as a mirror or buffer between each iteration of a recursive query.
58#[derive(Debug)]
59pub(super) struct WorkTable {
60    batches: Mutex<Option<ReservedBatches>>,
61}
62
63impl WorkTable {
64    /// Create a new work table.
65    pub(super) fn new() -> Self {
66        Self {
67            batches: Mutex::new(None),
68        }
69    }
70
71    /// Take the previously written batches from the work table.
72    /// This will be called by the [`WorkTableExec`] when it is executed.
73    fn take(&self) -> Result<ReservedBatches> {
74        self.batches
75            .lock()
76            .unwrap()
77            .take()
78            .ok_or_else(|| internal_datafusion_err!("Unexpected empty work table"))
79    }
80
81    /// Update the results of a recursive query iteration to the work table.
82    pub(super) fn update(&self, batches: ReservedBatches) {
83        self.batches.lock().unwrap().replace(batches);
84    }
85}
86
87/// A temporary "working table" operation where the input data will be
88/// taken from the named handle during the execution and will be re-published
89/// as is (kind of like a mirror).
90///
91/// Most notably used in the implementation of recursive queries where the
92/// underlying relation does not exist yet but the data will come as the previous
93/// term is evaluated. This table will be used such that the recursive plan
94/// will register a receiver in the task context and this plan will use that
95/// receiver to get the data and stream it back up so that the batches are available
96/// in the next iteration.
97#[derive(Clone, Debug)]
98pub struct WorkTableExec {
99    /// Name of the relation handler
100    name: String,
101    /// The schema of the stream
102    schema: SchemaRef,
103    /// The work table
104    work_table: Arc<WorkTable>,
105    /// Execution metrics
106    metrics: ExecutionPlanMetricsSet,
107    /// Cache holding plan properties like equivalences, output partitioning etc.
108    cache: PlanProperties,
109}
110
111impl WorkTableExec {
112    /// Create a new execution plan for a worktable exec.
113    pub fn new(name: String, schema: SchemaRef) -> Self {
114        let cache = Self::compute_properties(Arc::clone(&schema));
115        Self {
116            name,
117            schema,
118            metrics: ExecutionPlanMetricsSet::new(),
119            work_table: Arc::new(WorkTable::new()),
120            cache,
121        }
122    }
123
124    /// Ref to name
125    pub fn name(&self) -> &str {
126        &self.name
127    }
128
129    /// Arc clone of ref to schema
130    pub fn schema(&self) -> SchemaRef {
131        Arc::clone(&self.schema)
132    }
133
134    pub(super) fn with_work_table(&self, work_table: Arc<WorkTable>) -> Self {
135        Self {
136            name: self.name.clone(),
137            schema: Arc::clone(&self.schema),
138            metrics: ExecutionPlanMetricsSet::new(),
139            work_table,
140            cache: self.cache.clone(),
141        }
142    }
143
144    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
145    fn compute_properties(schema: SchemaRef) -> PlanProperties {
146        PlanProperties::new(
147            EquivalenceProperties::new(schema),
148            Partitioning::UnknownPartitioning(1),
149            EmissionType::Incremental,
150            Boundedness::Bounded,
151        )
152    }
153}
154
155impl DisplayAs for WorkTableExec {
156    fn fmt_as(
157        &self,
158        t: DisplayFormatType,
159        f: &mut std::fmt::Formatter,
160    ) -> std::fmt::Result {
161        match t {
162            DisplayFormatType::Default | DisplayFormatType::Verbose => {
163                write!(f, "WorkTableExec: name={}", self.name)
164            }
165        }
166    }
167}
168
169impl ExecutionPlan for WorkTableExec {
170    fn name(&self) -> &'static str {
171        "WorkTableExec"
172    }
173
174    fn as_any(&self) -> &dyn Any {
175        self
176    }
177
178    fn properties(&self) -> &PlanProperties {
179        &self.cache
180    }
181
182    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
183        vec![]
184    }
185
186    fn maintains_input_order(&self) -> Vec<bool> {
187        vec![false]
188    }
189
190    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
191        vec![false]
192    }
193
194    fn with_new_children(
195        self: Arc<Self>,
196        _: Vec<Arc<dyn ExecutionPlan>>,
197    ) -> Result<Arc<dyn ExecutionPlan>> {
198        Ok(Arc::clone(&self) as Arc<dyn ExecutionPlan>)
199    }
200
201    /// Stream the batches that were written to the work table.
202    fn execute(
203        &self,
204        partition: usize,
205        _context: Arc<TaskContext>,
206    ) -> Result<SendableRecordBatchStream> {
207        // WorkTable streams must be the plan base.
208        if partition != 0 {
209            return internal_err!(
210                "WorkTableExec got an invalid partition {partition} (expected 0)"
211            );
212        }
213        let batch = self.work_table.take()?;
214        Ok(Box::pin(
215            MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), None)?
216                .with_reservation(batch.reservation),
217        ))
218    }
219
220    fn metrics(&self) -> Option<MetricsSet> {
221        Some(self.metrics.clone_inner())
222    }
223
224    fn statistics(&self) -> Result<Statistics> {
225        Ok(Statistics::new_unknown(&self.schema()))
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use arrow::array::{ArrayRef, Int32Array};
233    use datafusion_execution::memory_pool::{MemoryConsumer, UnboundedMemoryPool};
234
235    #[test]
236    fn test_work_table() {
237        let work_table = WorkTable::new();
238        // Can't take from empty work_table
239        assert!(work_table.take().is_err());
240
241        let pool = Arc::new(UnboundedMemoryPool::default()) as _;
242        let mut reservation = MemoryConsumer::new("test_work_table").register(&pool);
243
244        // Update batch to work_table
245        let array: ArrayRef = Arc::new((0..5).collect::<Int32Array>());
246        let batch = RecordBatch::try_from_iter(vec![("col", array)]).unwrap();
247        reservation.try_grow(100).unwrap();
248        work_table.update(ReservedBatches::new(vec![batch.clone()], reservation));
249        // Take from work_table
250        let reserved_batches = work_table.take().unwrap();
251        assert_eq!(reserved_batches.batches, vec![batch.clone()]);
252
253        // Consume the batch by the MemoryStream
254        let memory_stream =
255            MemoryStream::try_new(reserved_batches.batches, batch.schema(), None)
256                .unwrap()
257                .with_reservation(reserved_batches.reservation);
258
259        // Should still be reserved
260        assert_eq!(pool.reserved(), 100);
261
262        // The reservation should be freed after drop the memory_stream
263        drop(memory_stream);
264        assert_eq!(pool.reserved(), 0);
265    }
266}