datafusion_physical_plan/
memory.rs1use std::any::Any;
21use std::fmt;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use crate::execution_plan::{Boundedness, EmissionType};
26use crate::{
27 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
28 RecordBatchStream, SendableRecordBatchStream, Statistics,
29};
30
31use arrow::array::RecordBatch;
32use arrow::datatypes::SchemaRef;
33use datafusion_common::{internal_err, Result};
34use datafusion_execution::memory_pool::MemoryReservation;
35use datafusion_execution::TaskContext;
36use datafusion_physical_expr::EquivalenceProperties;
37
38use futures::Stream;
39use parking_lot::RwLock;
40
41pub struct MemoryStream {
43 data: Vec<RecordBatch>,
45 reservation: Option<MemoryReservation>,
47 schema: SchemaRef,
49 projection: Option<Vec<usize>>,
51 index: usize,
53 fetch: Option<usize>,
55}
56
57impl MemoryStream {
58 pub fn try_new(
60 data: Vec<RecordBatch>,
61 schema: SchemaRef,
62 projection: Option<Vec<usize>>,
63 ) -> Result<Self> {
64 Ok(Self {
65 data,
66 reservation: None,
67 schema,
68 projection,
69 index: 0,
70 fetch: None,
71 })
72 }
73
74 pub fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
76 self.reservation = Some(reservation);
77 self
78 }
79
80 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
82 self.fetch = fetch;
83 self
84 }
85}
86
87impl Stream for MemoryStream {
88 type Item = Result<RecordBatch>;
89
90 fn poll_next(
91 mut self: std::pin::Pin<&mut Self>,
92 _: &mut Context<'_>,
93 ) -> Poll<Option<Self::Item>> {
94 if self.index >= self.data.len() {
95 return Poll::Ready(None);
96 }
97 self.index += 1;
98 let batch = &self.data[self.index - 1];
99 let batch = match self.projection.as_ref() {
101 Some(columns) => batch.project(columns)?,
102 None => batch.clone(),
103 };
104
105 let Some(&fetch) = self.fetch.as_ref() else {
106 return Poll::Ready(Some(Ok(batch)));
107 };
108 if fetch == 0 {
109 return Poll::Ready(None);
110 }
111
112 let batch = if batch.num_rows() > fetch {
113 batch.slice(0, fetch)
114 } else {
115 batch
116 };
117 self.fetch = Some(fetch - batch.num_rows());
118 Poll::Ready(Some(Ok(batch)))
119 }
120
121 fn size_hint(&self) -> (usize, Option<usize>) {
122 (self.data.len(), Some(self.data.len()))
123 }
124}
125
126impl RecordBatchStream for MemoryStream {
127 fn schema(&self) -> SchemaRef {
129 Arc::clone(&self.schema)
130 }
131}
132
133pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
134 fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>;
136}
137
138pub struct LazyMemoryExec {
143 schema: SchemaRef,
145 batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
147 cache: PlanProperties,
149}
150
151impl LazyMemoryExec {
152 pub fn try_new(
154 schema: SchemaRef,
155 generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
156 ) -> Result<Self> {
157 let cache = PlanProperties::new(
158 EquivalenceProperties::new(Arc::clone(&schema)),
159 Partitioning::RoundRobinBatch(generators.len()),
160 EmissionType::Incremental,
161 Boundedness::Bounded,
162 );
163 Ok(Self {
164 schema,
165 batch_generators: generators,
166 cache,
167 })
168 }
169}
170
171impl fmt::Debug for LazyMemoryExec {
172 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
173 f.debug_struct("LazyMemoryExec")
174 .field("schema", &self.schema)
175 .field("batch_generators", &self.batch_generators)
176 .finish()
177 }
178}
179
180impl DisplayAs for LazyMemoryExec {
181 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
182 match t {
183 DisplayFormatType::Default | DisplayFormatType::Verbose => {
184 write!(
185 f,
186 "LazyMemoryExec: partitions={}, batch_generators=[{}]",
187 self.batch_generators.len(),
188 self.batch_generators
189 .iter()
190 .map(|g| g.read().to_string())
191 .collect::<Vec<_>>()
192 .join(", ")
193 )
194 }
195 }
196 }
197}
198
199impl ExecutionPlan for LazyMemoryExec {
200 fn name(&self) -> &'static str {
201 "LazyMemoryExec"
202 }
203
204 fn as_any(&self) -> &dyn Any {
205 self
206 }
207
208 fn schema(&self) -> SchemaRef {
209 Arc::clone(&self.schema)
210 }
211
212 fn properties(&self) -> &PlanProperties {
213 &self.cache
214 }
215
216 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
217 vec![]
218 }
219
220 fn with_new_children(
221 self: Arc<Self>,
222 children: Vec<Arc<dyn ExecutionPlan>>,
223 ) -> Result<Arc<dyn ExecutionPlan>> {
224 if children.is_empty() {
225 Ok(self)
226 } else {
227 internal_err!("Children cannot be replaced in LazyMemoryExec")
228 }
229 }
230
231 fn execute(
232 &self,
233 partition: usize,
234 _context: Arc<TaskContext>,
235 ) -> Result<SendableRecordBatchStream> {
236 if partition >= self.batch_generators.len() {
237 return internal_err!(
238 "Invalid partition {} for LazyMemoryExec with {} partitions",
239 partition,
240 self.batch_generators.len()
241 );
242 }
243
244 Ok(Box::pin(LazyMemoryStream {
245 schema: Arc::clone(&self.schema),
246 generator: Arc::clone(&self.batch_generators[partition]),
247 }))
248 }
249
250 fn statistics(&self) -> Result<Statistics> {
251 Ok(Statistics::new_unknown(&self.schema))
252 }
253}
254
255pub struct LazyMemoryStream {
257 schema: SchemaRef,
258 generator: Arc<RwLock<dyn LazyBatchGenerator>>,
266}
267
268impl Stream for LazyMemoryStream {
269 type Item = Result<RecordBatch>;
270
271 fn poll_next(
272 self: std::pin::Pin<&mut Self>,
273 _: &mut Context<'_>,
274 ) -> Poll<Option<Self::Item>> {
275 let batch = self.generator.write().generate_next_batch();
276
277 match batch {
278 Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
279 Ok(None) => Poll::Ready(None),
280 Err(e) => Poll::Ready(Some(Err(e))),
281 }
282 }
283}
284
285impl RecordBatchStream for LazyMemoryStream {
286 fn schema(&self) -> SchemaRef {
287 Arc::clone(&self.schema)
288 }
289}
290
291#[cfg(test)]
292mod lazy_memory_tests {
293 use super::*;
294 use arrow::array::Int64Array;
295 use arrow::datatypes::{DataType, Field, Schema};
296 use futures::StreamExt;
297
298 #[derive(Debug, Clone)]
299 struct TestGenerator {
300 counter: i64,
301 max_batches: i64,
302 batch_size: usize,
303 schema: SchemaRef,
304 }
305
306 impl fmt::Display for TestGenerator {
307 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
308 write!(
309 f,
310 "TestGenerator: counter={}, max_batches={}, batch_size={}",
311 self.counter, self.max_batches, self.batch_size
312 )
313 }
314 }
315
316 impl LazyBatchGenerator for TestGenerator {
317 fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
318 if self.counter >= self.max_batches {
319 return Ok(None);
320 }
321
322 let array = Int64Array::from_iter_values(
323 (self.counter * self.batch_size as i64)
324 ..(self.counter * self.batch_size as i64 + self.batch_size as i64),
325 );
326 self.counter += 1;
327 Ok(Some(RecordBatch::try_new(
328 Arc::clone(&self.schema),
329 vec![Arc::new(array)],
330 )?))
331 }
332 }
333
334 #[tokio::test]
335 async fn test_lazy_memory_exec() -> Result<()> {
336 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
337 let generator = TestGenerator {
338 counter: 0,
339 max_batches: 3,
340 batch_size: 2,
341 schema: Arc::clone(&schema),
342 };
343
344 let exec =
345 LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
346
347 assert_eq!(exec.schema().fields().len(), 1);
349 assert_eq!(exec.schema().field(0).name(), "a");
350
351 let stream = exec.execute(0, Arc::new(TaskContext::default()))?;
353 let batches: Vec<_> = stream.collect::<Vec<_>>().await;
354
355 assert_eq!(batches.len(), 3);
356
357 let batch0 = batches[0].as_ref().unwrap();
359 let array0 = batch0
360 .column(0)
361 .as_any()
362 .downcast_ref::<Int64Array>()
363 .unwrap();
364 assert_eq!(array0.values(), &[0, 1]);
365
366 let batch1 = batches[1].as_ref().unwrap();
367 let array1 = batch1
368 .column(0)
369 .as_any()
370 .downcast_ref::<Int64Array>()
371 .unwrap();
372 assert_eq!(array1.values(), &[2, 3]);
373
374 let batch2 = batches[2].as_ref().unwrap();
375 let array2 = batch2
376 .column(0)
377 .as_any()
378 .downcast_ref::<Int64Array>()
379 .unwrap();
380 assert_eq!(array2.values(), &[4, 5]);
381
382 Ok(())
383 }
384
385 #[tokio::test]
386 async fn test_lazy_memory_exec_invalid_partition() -> Result<()> {
387 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
388 let generator = TestGenerator {
389 counter: 0,
390 max_batches: 1,
391 batch_size: 1,
392 schema: Arc::clone(&schema),
393 };
394
395 let exec =
396 LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
397
398 let result = exec.execute(1, Arc::new(TaskContext::default()));
400
401 assert!(matches!(
403 result,
404 Err(e) if e.to_string().contains("Invalid partition 1 for LazyMemoryExec with 1 partitions")
405 ));
406
407 Ok(())
408 }
409}