datafusion_physical_plan/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading in-memory batches of data
19
20use std::any::Any;
21use std::fmt;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use crate::execution_plan::{Boundedness, EmissionType};
26use crate::{
27    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
28    RecordBatchStream, SendableRecordBatchStream, Statistics,
29};
30
31use arrow::array::RecordBatch;
32use arrow::datatypes::SchemaRef;
33use datafusion_common::{internal_err, Result};
34use datafusion_execution::memory_pool::MemoryReservation;
35use datafusion_execution::TaskContext;
36use datafusion_physical_expr::EquivalenceProperties;
37
38use futures::Stream;
39use parking_lot::RwLock;
40
41/// Iterator over batches
42pub struct MemoryStream {
43    /// Vector of record batches
44    data: Vec<RecordBatch>,
45    /// Optional memory reservation bound to the data, freed on drop
46    reservation: Option<MemoryReservation>,
47    /// Schema representing the data
48    schema: SchemaRef,
49    /// Optional projection for which columns to load
50    projection: Option<Vec<usize>>,
51    /// Index into the data
52    index: usize,
53    /// The remaining number of rows to return. If None, all rows are returned
54    fetch: Option<usize>,
55}
56
57impl MemoryStream {
58    /// Create an iterator for a vector of record batches
59    pub fn try_new(
60        data: Vec<RecordBatch>,
61        schema: SchemaRef,
62        projection: Option<Vec<usize>>,
63    ) -> Result<Self> {
64        Ok(Self {
65            data,
66            reservation: None,
67            schema,
68            projection,
69            index: 0,
70            fetch: None,
71        })
72    }
73
74    /// Set the memory reservation for the data
75    pub fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
76        self.reservation = Some(reservation);
77        self
78    }
79
80    /// Set the number of rows to produce
81    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
82        self.fetch = fetch;
83        self
84    }
85}
86
87impl Stream for MemoryStream {
88    type Item = Result<RecordBatch>;
89
90    fn poll_next(
91        mut self: std::pin::Pin<&mut Self>,
92        _: &mut Context<'_>,
93    ) -> Poll<Option<Self::Item>> {
94        if self.index >= self.data.len() {
95            return Poll::Ready(None);
96        }
97        self.index += 1;
98        let batch = &self.data[self.index - 1];
99        // return just the columns requested
100        let batch = match self.projection.as_ref() {
101            Some(columns) => batch.project(columns)?,
102            None => batch.clone(),
103        };
104
105        let Some(&fetch) = self.fetch.as_ref() else {
106            return Poll::Ready(Some(Ok(batch)));
107        };
108        if fetch == 0 {
109            return Poll::Ready(None);
110        }
111
112        let batch = if batch.num_rows() > fetch {
113            batch.slice(0, fetch)
114        } else {
115            batch
116        };
117        self.fetch = Some(fetch - batch.num_rows());
118        Poll::Ready(Some(Ok(batch)))
119    }
120
121    fn size_hint(&self) -> (usize, Option<usize>) {
122        (self.data.len(), Some(self.data.len()))
123    }
124}
125
126impl RecordBatchStream for MemoryStream {
127    /// Get the schema
128    fn schema(&self) -> SchemaRef {
129        Arc::clone(&self.schema)
130    }
131}
132
133pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
134    /// Generate the next batch, return `None` when no more batches are available
135    fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>;
136}
137
138/// Execution plan for lazy in-memory batches of data
139///
140/// This plan generates output batches lazily, it doesn't have to buffer all batches
141/// in memory up front (compared to `MemorySourceConfig`), thus consuming constant memory.
142pub struct LazyMemoryExec {
143    /// Schema representing the data
144    schema: SchemaRef,
145    /// Functions to generate batches for each partition
146    batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
147    /// Plan properties cache storing equivalence properties, partitioning, and execution mode
148    cache: PlanProperties,
149}
150
151impl LazyMemoryExec {
152    /// Create a new lazy memory execution plan
153    pub fn try_new(
154        schema: SchemaRef,
155        generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
156    ) -> Result<Self> {
157        let cache = PlanProperties::new(
158            EquivalenceProperties::new(Arc::clone(&schema)),
159            Partitioning::RoundRobinBatch(generators.len()),
160            EmissionType::Incremental,
161            Boundedness::Bounded,
162        );
163        Ok(Self {
164            schema,
165            batch_generators: generators,
166            cache,
167        })
168    }
169}
170
171impl fmt::Debug for LazyMemoryExec {
172    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
173        f.debug_struct("LazyMemoryExec")
174            .field("schema", &self.schema)
175            .field("batch_generators", &self.batch_generators)
176            .finish()
177    }
178}
179
180impl DisplayAs for LazyMemoryExec {
181    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
182        match t {
183            DisplayFormatType::Default | DisplayFormatType::Verbose => {
184                write!(
185                    f,
186                    "LazyMemoryExec: partitions={}, batch_generators=[{}]",
187                    self.batch_generators.len(),
188                    self.batch_generators
189                        .iter()
190                        .map(|g| g.read().to_string())
191                        .collect::<Vec<_>>()
192                        .join(", ")
193                )
194            }
195        }
196    }
197}
198
199impl ExecutionPlan for LazyMemoryExec {
200    fn name(&self) -> &'static str {
201        "LazyMemoryExec"
202    }
203
204    fn as_any(&self) -> &dyn Any {
205        self
206    }
207
208    fn schema(&self) -> SchemaRef {
209        Arc::clone(&self.schema)
210    }
211
212    fn properties(&self) -> &PlanProperties {
213        &self.cache
214    }
215
216    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
217        vec![]
218    }
219
220    fn with_new_children(
221        self: Arc<Self>,
222        children: Vec<Arc<dyn ExecutionPlan>>,
223    ) -> Result<Arc<dyn ExecutionPlan>> {
224        if children.is_empty() {
225            Ok(self)
226        } else {
227            internal_err!("Children cannot be replaced in LazyMemoryExec")
228        }
229    }
230
231    fn execute(
232        &self,
233        partition: usize,
234        _context: Arc<TaskContext>,
235    ) -> Result<SendableRecordBatchStream> {
236        if partition >= self.batch_generators.len() {
237            return internal_err!(
238                "Invalid partition {} for LazyMemoryExec with {} partitions",
239                partition,
240                self.batch_generators.len()
241            );
242        }
243
244        Ok(Box::pin(LazyMemoryStream {
245            schema: Arc::clone(&self.schema),
246            generator: Arc::clone(&self.batch_generators[partition]),
247        }))
248    }
249
250    fn statistics(&self) -> Result<Statistics> {
251        Ok(Statistics::new_unknown(&self.schema))
252    }
253}
254
255/// Stream that generates record batches on demand
256pub struct LazyMemoryStream {
257    schema: SchemaRef,
258    /// Generator to produce batches
259    ///
260    /// Note: Idiomatically, DataFusion uses plan-time parallelism - each stream
261    /// should have a unique `LazyBatchGenerator`. Use RepartitionExec or
262    /// construct multiple `LazyMemoryStream`s during planning to enable
263    /// parallel execution.
264    /// Sharing generators between streams should be used with caution.
265    generator: Arc<RwLock<dyn LazyBatchGenerator>>,
266}
267
268impl Stream for LazyMemoryStream {
269    type Item = Result<RecordBatch>;
270
271    fn poll_next(
272        self: std::pin::Pin<&mut Self>,
273        _: &mut Context<'_>,
274    ) -> Poll<Option<Self::Item>> {
275        let batch = self.generator.write().generate_next_batch();
276
277        match batch {
278            Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
279            Ok(None) => Poll::Ready(None),
280            Err(e) => Poll::Ready(Some(Err(e))),
281        }
282    }
283}
284
285impl RecordBatchStream for LazyMemoryStream {
286    fn schema(&self) -> SchemaRef {
287        Arc::clone(&self.schema)
288    }
289}
290
291#[cfg(test)]
292mod lazy_memory_tests {
293    use super::*;
294    use arrow::array::Int64Array;
295    use arrow::datatypes::{DataType, Field, Schema};
296    use futures::StreamExt;
297
298    #[derive(Debug, Clone)]
299    struct TestGenerator {
300        counter: i64,
301        max_batches: i64,
302        batch_size: usize,
303        schema: SchemaRef,
304    }
305
306    impl fmt::Display for TestGenerator {
307        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
308            write!(
309                f,
310                "TestGenerator: counter={}, max_batches={}, batch_size={}",
311                self.counter, self.max_batches, self.batch_size
312            )
313        }
314    }
315
316    impl LazyBatchGenerator for TestGenerator {
317        fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
318            if self.counter >= self.max_batches {
319                return Ok(None);
320            }
321
322            let array = Int64Array::from_iter_values(
323                (self.counter * self.batch_size as i64)
324                    ..(self.counter * self.batch_size as i64 + self.batch_size as i64),
325            );
326            self.counter += 1;
327            Ok(Some(RecordBatch::try_new(
328                Arc::clone(&self.schema),
329                vec![Arc::new(array)],
330            )?))
331        }
332    }
333
334    #[tokio::test]
335    async fn test_lazy_memory_exec() -> Result<()> {
336        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
337        let generator = TestGenerator {
338            counter: 0,
339            max_batches: 3,
340            batch_size: 2,
341            schema: Arc::clone(&schema),
342        };
343
344        let exec =
345            LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
346
347        // Test schema
348        assert_eq!(exec.schema().fields().len(), 1);
349        assert_eq!(exec.schema().field(0).name(), "a");
350
351        // Test execution
352        let stream = exec.execute(0, Arc::new(TaskContext::default()))?;
353        let batches: Vec<_> = stream.collect::<Vec<_>>().await;
354
355        assert_eq!(batches.len(), 3);
356
357        // Verify batch contents
358        let batch0 = batches[0].as_ref().unwrap();
359        let array0 = batch0
360            .column(0)
361            .as_any()
362            .downcast_ref::<Int64Array>()
363            .unwrap();
364        assert_eq!(array0.values(), &[0, 1]);
365
366        let batch1 = batches[1].as_ref().unwrap();
367        let array1 = batch1
368            .column(0)
369            .as_any()
370            .downcast_ref::<Int64Array>()
371            .unwrap();
372        assert_eq!(array1.values(), &[2, 3]);
373
374        let batch2 = batches[2].as_ref().unwrap();
375        let array2 = batch2
376            .column(0)
377            .as_any()
378            .downcast_ref::<Int64Array>()
379            .unwrap();
380        assert_eq!(array2.values(), &[4, 5]);
381
382        Ok(())
383    }
384
385    #[tokio::test]
386    async fn test_lazy_memory_exec_invalid_partition() -> Result<()> {
387        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
388        let generator = TestGenerator {
389            counter: 0,
390            max_batches: 1,
391            batch_size: 1,
392            schema: Arc::clone(&schema),
393        };
394
395        let exec =
396            LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
397
398        // Test invalid partition
399        let result = exec.execute(1, Arc::new(TaskContext::default()));
400
401        // partition is 0-indexed, so there only should be partition 0
402        assert!(matches!(
403            result,
404            Err(e) if e.to_string().contains("Invalid partition 1 for LazyMemoryExec with 1 partitions")
405        ));
406
407        Ok(())
408    }
409}