datafusion_physical_plan/
streaming.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Generic plans for deferred execution: [`StreamingTableExec`] and [`PartitionStream`]
19
20use std::any::Any;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use super::{DisplayAs, DisplayFormatType, PlanProperties};
25use crate::display::{display_orderings, ProjectSchemaDisplay};
26use crate::execution_plan::{Boundedness, EmissionType};
27use crate::limit::LimitStream;
28use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
29use crate::projection::{
30    all_alias_free_columns, new_projections_for_columns, update_expr, ProjectionExec,
31};
32use crate::stream::RecordBatchStreamAdapter;
33use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
34
35use arrow::datatypes::{Schema, SchemaRef};
36use datafusion_common::{internal_err, plan_err, Result};
37use datafusion_execution::TaskContext;
38use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
39
40use async_trait::async_trait;
41use futures::stream::StreamExt;
42use log::debug;
43
44/// A partition that can be converted into a [`SendableRecordBatchStream`]
45///
46/// Combined with [`StreamingTableExec`], you can use this trait to implement
47/// [`ExecutionPlan`] for a custom source with less boiler plate than
48/// implementing `ExecutionPlan` directly for many use cases.
49pub trait PartitionStream: Debug + Send + Sync {
50    /// Returns the schema of this partition
51    fn schema(&self) -> &SchemaRef;
52
53    /// Returns a stream yielding this partitions values
54    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
55}
56
57/// An [`ExecutionPlan`] for one or more [`PartitionStream`]s.
58///
59/// If your source can be represented as one or more [`PartitionStream`]s, you can
60/// use this struct to implement [`ExecutionPlan`].
61#[derive(Clone)]
62pub struct StreamingTableExec {
63    partitions: Vec<Arc<dyn PartitionStream>>,
64    projection: Option<Arc<[usize]>>,
65    projected_schema: SchemaRef,
66    projected_output_ordering: Vec<LexOrdering>,
67    infinite: bool,
68    limit: Option<usize>,
69    cache: PlanProperties,
70    metrics: ExecutionPlanMetricsSet,
71}
72
73impl StreamingTableExec {
74    /// Try to create a new [`StreamingTableExec`] returning an error if the schema is incorrect
75    pub fn try_new(
76        schema: SchemaRef,
77        partitions: Vec<Arc<dyn PartitionStream>>,
78        projection: Option<&Vec<usize>>,
79        projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
80        infinite: bool,
81        limit: Option<usize>,
82    ) -> Result<Self> {
83        for x in partitions.iter() {
84            let partition_schema = x.schema();
85            if !schema.eq(partition_schema) {
86                debug!(
87                    "Target schema does not match with partition schema. \
88                        Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
89                );
90                return plan_err!("Mismatch between schema and batches");
91            }
92        }
93
94        let projected_schema = match projection {
95            Some(p) => Arc::new(schema.project(p)?),
96            None => schema,
97        };
98        let projected_output_ordering =
99            projected_output_ordering.into_iter().collect::<Vec<_>>();
100        let cache = Self::compute_properties(
101            Arc::clone(&projected_schema),
102            &projected_output_ordering,
103            &partitions,
104            infinite,
105        );
106        Ok(Self {
107            partitions,
108            projected_schema,
109            projection: projection.cloned().map(Into::into),
110            projected_output_ordering,
111            infinite,
112            limit,
113            cache,
114            metrics: ExecutionPlanMetricsSet::new(),
115        })
116    }
117
118    pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
119        &self.partitions
120    }
121
122    pub fn partition_schema(&self) -> &SchemaRef {
123        self.partitions[0].schema()
124    }
125
126    pub fn projection(&self) -> &Option<Arc<[usize]>> {
127        &self.projection
128    }
129
130    pub fn projected_schema(&self) -> &Schema {
131        &self.projected_schema
132    }
133
134    pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
135        self.projected_output_ordering.clone()
136    }
137
138    pub fn is_infinite(&self) -> bool {
139        self.infinite
140    }
141
142    pub fn limit(&self) -> Option<usize> {
143        self.limit
144    }
145
146    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
147    fn compute_properties(
148        schema: SchemaRef,
149        orderings: &[LexOrdering],
150        partitions: &[Arc<dyn PartitionStream>],
151        infinite: bool,
152    ) -> PlanProperties {
153        // Calculate equivalence properties:
154        let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
155
156        // Get output partitioning:
157        let output_partitioning = Partitioning::UnknownPartitioning(partitions.len());
158        let boundedness = if infinite {
159            Boundedness::Unbounded {
160                requires_infinite_memory: false,
161            }
162        } else {
163            Boundedness::Bounded
164        };
165        PlanProperties::new(
166            eq_properties,
167            output_partitioning,
168            EmissionType::Incremental,
169            boundedness,
170        )
171    }
172}
173
174impl Debug for StreamingTableExec {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
177    }
178}
179
180impl DisplayAs for StreamingTableExec {
181    fn fmt_as(
182        &self,
183        t: DisplayFormatType,
184        f: &mut std::fmt::Formatter,
185    ) -> std::fmt::Result {
186        match t {
187            DisplayFormatType::Default | DisplayFormatType::Verbose => {
188                write!(
189                    f,
190                    "StreamingTableExec: partition_sizes={:?}",
191                    self.partitions.len(),
192                )?;
193                if !self.projected_schema.fields().is_empty() {
194                    write!(
195                        f,
196                        ", projection={}",
197                        ProjectSchemaDisplay(&self.projected_schema)
198                    )?;
199                }
200                if self.infinite {
201                    write!(f, ", infinite_source=true")?;
202                }
203                if let Some(fetch) = self.limit {
204                    write!(f, ", fetch={fetch}")?;
205                }
206
207                display_orderings(f, &self.projected_output_ordering)?;
208
209                Ok(())
210            }
211        }
212    }
213}
214
215#[async_trait]
216impl ExecutionPlan for StreamingTableExec {
217    fn name(&self) -> &'static str {
218        "StreamingTableExec"
219    }
220
221    fn as_any(&self) -> &dyn Any {
222        self
223    }
224
225    fn properties(&self) -> &PlanProperties {
226        &self.cache
227    }
228
229    fn fetch(&self) -> Option<usize> {
230        self.limit
231    }
232
233    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
234        vec![]
235    }
236
237    fn with_new_children(
238        self: Arc<Self>,
239        children: Vec<Arc<dyn ExecutionPlan>>,
240    ) -> Result<Arc<dyn ExecutionPlan>> {
241        if children.is_empty() {
242            Ok(self)
243        } else {
244            internal_err!("Children cannot be replaced in {self:?}")
245        }
246    }
247
248    fn execute(
249        &self,
250        partition: usize,
251        ctx: Arc<TaskContext>,
252    ) -> Result<SendableRecordBatchStream> {
253        let stream = self.partitions[partition].execute(ctx);
254        let projected_stream = match self.projection.clone() {
255            Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
256                Arc::clone(&self.projected_schema),
257                stream.map(move |x| {
258                    x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into))
259                }),
260            )),
261            None => stream,
262        };
263        Ok(match self.limit {
264            None => projected_stream,
265            Some(fetch) => {
266                let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
267                Box::pin(LimitStream::new(
268                    projected_stream,
269                    0,
270                    Some(fetch),
271                    baseline_metrics,
272                ))
273            }
274        })
275    }
276
277    /// Tries to embed `projection` to its input (`streaming table`).
278    /// If possible, returns [`StreamingTableExec`] as the top plan. Otherwise,
279    /// returns `None`.
280    fn try_swapping_with_projection(
281        &self,
282        projection: &ProjectionExec,
283    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
284        if !all_alias_free_columns(projection.expr()) {
285            return Ok(None);
286        }
287
288        let streaming_table_projections =
289            self.projection().as_ref().map(|i| i.as_ref().to_vec());
290        let new_projections = new_projections_for_columns(
291            projection,
292            &streaming_table_projections
293                .unwrap_or((0..self.schema().fields().len()).collect()),
294        );
295
296        let mut lex_orderings = vec![];
297        for lex_ordering in self.projected_output_ordering().into_iter() {
298            let mut orderings = LexOrdering::default();
299            for order in lex_ordering {
300                let Some(new_ordering) =
301                    update_expr(&order.expr, projection.expr(), false)?
302                else {
303                    return Ok(None);
304                };
305                orderings.push(PhysicalSortExpr {
306                    expr: new_ordering,
307                    options: order.options,
308                });
309            }
310            lex_orderings.push(orderings);
311        }
312
313        StreamingTableExec::try_new(
314            Arc::clone(self.partition_schema()),
315            self.partitions().clone(),
316            Some(new_projections.as_ref()),
317            lex_orderings,
318            self.is_infinite(),
319            self.limit(),
320        )
321        .map(|e| Some(Arc::new(e) as _))
322    }
323
324    fn metrics(&self) -> Option<MetricsSet> {
325        Some(self.metrics.clone_inner())
326    }
327
328    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
329        Some(Arc::new(StreamingTableExec {
330            partitions: self.partitions.clone(),
331            projection: self.projection.clone(),
332            projected_schema: Arc::clone(&self.projected_schema),
333            projected_output_ordering: self.projected_output_ordering.clone(),
334            infinite: self.infinite,
335            limit,
336            cache: self.cache.clone(),
337            metrics: self.metrics.clone(),
338        }))
339    }
340}
341
342#[cfg(test)]
343mod test {
344    use super::*;
345    use crate::collect_partitioned;
346    use crate::streaming::PartitionStream;
347    use crate::test::{make_partition, TestPartitionStream};
348    use arrow::record_batch::RecordBatch;
349
350    #[tokio::test]
351    async fn test_no_limit() {
352        let exec = TestBuilder::new()
353            // Make 2 batches, each with 100 rows
354            .with_batches(vec![make_partition(100), make_partition(100)])
355            .build();
356
357        let counts = collect_num_rows(Arc::new(exec)).await;
358        assert_eq!(counts, vec![200]);
359    }
360
361    #[tokio::test]
362    async fn test_limit() {
363        let exec = TestBuilder::new()
364            // Make 2 batches, each with 100 rows
365            .with_batches(vec![make_partition(100), make_partition(100)])
366            // Limit to only the first 75 rows back
367            .with_limit(Some(75))
368            .build();
369
370        let counts = collect_num_rows(Arc::new(exec)).await;
371        assert_eq!(counts, vec![75]);
372    }
373
374    /// Runs the provided execution plan and returns a vector of the number of
375    /// rows in each partition
376    async fn collect_num_rows(exec: Arc<dyn ExecutionPlan>) -> Vec<usize> {
377        let ctx = Arc::new(TaskContext::default());
378        let partition_batches = collect_partitioned(exec, ctx).await.unwrap();
379        partition_batches
380            .into_iter()
381            .map(|batches| batches.iter().map(|b| b.num_rows()).sum::<usize>())
382            .collect()
383    }
384
385    #[derive(Default)]
386    struct TestBuilder {
387        schema: Option<SchemaRef>,
388        partitions: Vec<Arc<dyn PartitionStream>>,
389        projection: Option<Vec<usize>>,
390        projected_output_ordering: Vec<LexOrdering>,
391        infinite: bool,
392        limit: Option<usize>,
393    }
394
395    impl TestBuilder {
396        fn new() -> Self {
397            Self::default()
398        }
399
400        /// Set the batches for the stream
401        fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
402            let stream = TestPartitionStream::new_with_batches(batches);
403            self.schema = Some(Arc::clone(stream.schema()));
404            self.partitions = vec![Arc::new(stream)];
405            self
406        }
407
408        /// Set the limit for the stream
409        fn with_limit(mut self, limit: Option<usize>) -> Self {
410            self.limit = limit;
411            self
412        }
413
414        fn build(self) -> StreamingTableExec {
415            StreamingTableExec::try_new(
416                self.schema.unwrap(),
417                self.partitions,
418                self.projection.as_ref(),
419                self.projected_output_ordering,
420                self.infinite,
421                self.limit,
422            )
423            .unwrap()
424        }
425    }
426}