datafusion_physical_plan/
streaming.rs1use std::any::Any;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use super::{DisplayAs, DisplayFormatType, PlanProperties};
25use crate::display::{display_orderings, ProjectSchemaDisplay};
26use crate::execution_plan::{Boundedness, EmissionType};
27use crate::limit::LimitStream;
28use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
29use crate::projection::{
30 all_alias_free_columns, new_projections_for_columns, update_expr, ProjectionExec,
31};
32use crate::stream::RecordBatchStreamAdapter;
33use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
34
35use arrow::datatypes::{Schema, SchemaRef};
36use datafusion_common::{internal_err, plan_err, Result};
37use datafusion_execution::TaskContext;
38use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
39
40use async_trait::async_trait;
41use futures::stream::StreamExt;
42use log::debug;
43
44pub trait PartitionStream: Debug + Send + Sync {
50 fn schema(&self) -> &SchemaRef;
52
53 fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
55}
56
57#[derive(Clone)]
62pub struct StreamingTableExec {
63 partitions: Vec<Arc<dyn PartitionStream>>,
64 projection: Option<Arc<[usize]>>,
65 projected_schema: SchemaRef,
66 projected_output_ordering: Vec<LexOrdering>,
67 infinite: bool,
68 limit: Option<usize>,
69 cache: PlanProperties,
70 metrics: ExecutionPlanMetricsSet,
71}
72
73impl StreamingTableExec {
74 pub fn try_new(
76 schema: SchemaRef,
77 partitions: Vec<Arc<dyn PartitionStream>>,
78 projection: Option<&Vec<usize>>,
79 projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
80 infinite: bool,
81 limit: Option<usize>,
82 ) -> Result<Self> {
83 for x in partitions.iter() {
84 let partition_schema = x.schema();
85 if !schema.eq(partition_schema) {
86 debug!(
87 "Target schema does not match with partition schema. \
88 Target_schema: {schema:?}. Partition Schema: {partition_schema:?}"
89 );
90 return plan_err!("Mismatch between schema and batches");
91 }
92 }
93
94 let projected_schema = match projection {
95 Some(p) => Arc::new(schema.project(p)?),
96 None => schema,
97 };
98 let projected_output_ordering =
99 projected_output_ordering.into_iter().collect::<Vec<_>>();
100 let cache = Self::compute_properties(
101 Arc::clone(&projected_schema),
102 &projected_output_ordering,
103 &partitions,
104 infinite,
105 );
106 Ok(Self {
107 partitions,
108 projected_schema,
109 projection: projection.cloned().map(Into::into),
110 projected_output_ordering,
111 infinite,
112 limit,
113 cache,
114 metrics: ExecutionPlanMetricsSet::new(),
115 })
116 }
117
118 pub fn partitions(&self) -> &Vec<Arc<dyn PartitionStream>> {
119 &self.partitions
120 }
121
122 pub fn partition_schema(&self) -> &SchemaRef {
123 self.partitions[0].schema()
124 }
125
126 pub fn projection(&self) -> &Option<Arc<[usize]>> {
127 &self.projection
128 }
129
130 pub fn projected_schema(&self) -> &Schema {
131 &self.projected_schema
132 }
133
134 pub fn projected_output_ordering(&self) -> impl IntoIterator<Item = LexOrdering> {
135 self.projected_output_ordering.clone()
136 }
137
138 pub fn is_infinite(&self) -> bool {
139 self.infinite
140 }
141
142 pub fn limit(&self) -> Option<usize> {
143 self.limit
144 }
145
146 fn compute_properties(
148 schema: SchemaRef,
149 orderings: &[LexOrdering],
150 partitions: &[Arc<dyn PartitionStream>],
151 infinite: bool,
152 ) -> PlanProperties {
153 let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);
155
156 let output_partitioning = Partitioning::UnknownPartitioning(partitions.len());
158 let boundedness = if infinite {
159 Boundedness::Unbounded {
160 requires_infinite_memory: false,
161 }
162 } else {
163 Boundedness::Bounded
164 };
165 PlanProperties::new(
166 eq_properties,
167 output_partitioning,
168 EmissionType::Incremental,
169 boundedness,
170 )
171 }
172}
173
174impl Debug for StreamingTableExec {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 f.debug_struct("LazyMemTableExec").finish_non_exhaustive()
177 }
178}
179
180impl DisplayAs for StreamingTableExec {
181 fn fmt_as(
182 &self,
183 t: DisplayFormatType,
184 f: &mut std::fmt::Formatter,
185 ) -> std::fmt::Result {
186 match t {
187 DisplayFormatType::Default | DisplayFormatType::Verbose => {
188 write!(
189 f,
190 "StreamingTableExec: partition_sizes={:?}",
191 self.partitions.len(),
192 )?;
193 if !self.projected_schema.fields().is_empty() {
194 write!(
195 f,
196 ", projection={}",
197 ProjectSchemaDisplay(&self.projected_schema)
198 )?;
199 }
200 if self.infinite {
201 write!(f, ", infinite_source=true")?;
202 }
203 if let Some(fetch) = self.limit {
204 write!(f, ", fetch={fetch}")?;
205 }
206
207 display_orderings(f, &self.projected_output_ordering)?;
208
209 Ok(())
210 }
211 }
212 }
213}
214
215#[async_trait]
216impl ExecutionPlan for StreamingTableExec {
217 fn name(&self) -> &'static str {
218 "StreamingTableExec"
219 }
220
221 fn as_any(&self) -> &dyn Any {
222 self
223 }
224
225 fn properties(&self) -> &PlanProperties {
226 &self.cache
227 }
228
229 fn fetch(&self) -> Option<usize> {
230 self.limit
231 }
232
233 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
234 vec![]
235 }
236
237 fn with_new_children(
238 self: Arc<Self>,
239 children: Vec<Arc<dyn ExecutionPlan>>,
240 ) -> Result<Arc<dyn ExecutionPlan>> {
241 if children.is_empty() {
242 Ok(self)
243 } else {
244 internal_err!("Children cannot be replaced in {self:?}")
245 }
246 }
247
248 fn execute(
249 &self,
250 partition: usize,
251 ctx: Arc<TaskContext>,
252 ) -> Result<SendableRecordBatchStream> {
253 let stream = self.partitions[partition].execute(ctx);
254 let projected_stream = match self.projection.clone() {
255 Some(projection) => Box::pin(RecordBatchStreamAdapter::new(
256 Arc::clone(&self.projected_schema),
257 stream.map(move |x| {
258 x.and_then(|b| b.project(projection.as_ref()).map_err(Into::into))
259 }),
260 )),
261 None => stream,
262 };
263 Ok(match self.limit {
264 None => projected_stream,
265 Some(fetch) => {
266 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
267 Box::pin(LimitStream::new(
268 projected_stream,
269 0,
270 Some(fetch),
271 baseline_metrics,
272 ))
273 }
274 })
275 }
276
277 fn try_swapping_with_projection(
281 &self,
282 projection: &ProjectionExec,
283 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
284 if !all_alias_free_columns(projection.expr()) {
285 return Ok(None);
286 }
287
288 let streaming_table_projections =
289 self.projection().as_ref().map(|i| i.as_ref().to_vec());
290 let new_projections = new_projections_for_columns(
291 projection,
292 &streaming_table_projections
293 .unwrap_or((0..self.schema().fields().len()).collect()),
294 );
295
296 let mut lex_orderings = vec![];
297 for lex_ordering in self.projected_output_ordering().into_iter() {
298 let mut orderings = LexOrdering::default();
299 for order in lex_ordering {
300 let Some(new_ordering) =
301 update_expr(&order.expr, projection.expr(), false)?
302 else {
303 return Ok(None);
304 };
305 orderings.push(PhysicalSortExpr {
306 expr: new_ordering,
307 options: order.options,
308 });
309 }
310 lex_orderings.push(orderings);
311 }
312
313 StreamingTableExec::try_new(
314 Arc::clone(self.partition_schema()),
315 self.partitions().clone(),
316 Some(new_projections.as_ref()),
317 lex_orderings,
318 self.is_infinite(),
319 self.limit(),
320 )
321 .map(|e| Some(Arc::new(e) as _))
322 }
323
324 fn metrics(&self) -> Option<MetricsSet> {
325 Some(self.metrics.clone_inner())
326 }
327
328 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
329 Some(Arc::new(StreamingTableExec {
330 partitions: self.partitions.clone(),
331 projection: self.projection.clone(),
332 projected_schema: Arc::clone(&self.projected_schema),
333 projected_output_ordering: self.projected_output_ordering.clone(),
334 infinite: self.infinite,
335 limit,
336 cache: self.cache.clone(),
337 metrics: self.metrics.clone(),
338 }))
339 }
340}
341
342#[cfg(test)]
343mod test {
344 use super::*;
345 use crate::collect_partitioned;
346 use crate::streaming::PartitionStream;
347 use crate::test::{make_partition, TestPartitionStream};
348 use arrow::record_batch::RecordBatch;
349
350 #[tokio::test]
351 async fn test_no_limit() {
352 let exec = TestBuilder::new()
353 .with_batches(vec![make_partition(100), make_partition(100)])
355 .build();
356
357 let counts = collect_num_rows(Arc::new(exec)).await;
358 assert_eq!(counts, vec![200]);
359 }
360
361 #[tokio::test]
362 async fn test_limit() {
363 let exec = TestBuilder::new()
364 .with_batches(vec![make_partition(100), make_partition(100)])
366 .with_limit(Some(75))
368 .build();
369
370 let counts = collect_num_rows(Arc::new(exec)).await;
371 assert_eq!(counts, vec![75]);
372 }
373
374 async fn collect_num_rows(exec: Arc<dyn ExecutionPlan>) -> Vec<usize> {
377 let ctx = Arc::new(TaskContext::default());
378 let partition_batches = collect_partitioned(exec, ctx).await.unwrap();
379 partition_batches
380 .into_iter()
381 .map(|batches| batches.iter().map(|b| b.num_rows()).sum::<usize>())
382 .collect()
383 }
384
385 #[derive(Default)]
386 struct TestBuilder {
387 schema: Option<SchemaRef>,
388 partitions: Vec<Arc<dyn PartitionStream>>,
389 projection: Option<Vec<usize>>,
390 projected_output_ordering: Vec<LexOrdering>,
391 infinite: bool,
392 limit: Option<usize>,
393 }
394
395 impl TestBuilder {
396 fn new() -> Self {
397 Self::default()
398 }
399
400 fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
402 let stream = TestPartitionStream::new_with_batches(batches);
403 self.schema = Some(Arc::clone(stream.schema()));
404 self.partitions = vec![Arc::new(stream)];
405 self
406 }
407
408 fn with_limit(mut self, limit: Option<usize>) -> Self {
410 self.limit = limit;
411 self
412 }
413
414 fn build(self) -> StreamingTableExec {
415 StreamingTableExec::try_new(
416 self.schema.unwrap(),
417 self.partitions,
418 self.projection.as_ref(),
419 self.projected_output_ordering,
420 self.infinite,
421 self.limit,
422 )
423 .unwrap()
424 }
425 }
426}