lance_datafusion/
exec.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for working with datafusion execution plans
5
6use std::sync::{Arc, Mutex};
7
8use arrow_array::RecordBatch;
9use arrow_schema::Schema as ArrowSchema;
10use datafusion::{
11    catalog::streaming::StreamingTable,
12    dataframe::DataFrame,
13    execution::{
14        context::{SessionConfig, SessionContext},
15        disk_manager::DiskManagerConfig,
16        memory_pool::FairSpillPool,
17        runtime_env::RuntimeEnvBuilder,
18        TaskContext,
19    },
20    physical_plan::{
21        display::DisplayableExecutionPlan,
22        execution_plan::{Boundedness, EmissionType},
23        stream::RecordBatchStreamAdapter,
24        streaming::PartitionStream,
25        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
26    },
27};
28use datafusion_common::{DataFusionError, Statistics};
29use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
30use lazy_static::lazy_static;
31
32use futures::stream;
33use lance_arrow::SchemaExt;
34use lance_core::Result;
35use log::{debug, info, warn};
36
37/// An source execution node created from an existing stream
38///
39/// It can only be used once, and will return the stream.  After that the node
40/// is exhausted.
41///
42/// Note: the stream should be finite, otherwise we will report datafusion properties
43/// incorrectly.
44pub struct OneShotExec {
45    stream: Mutex<Option<SendableRecordBatchStream>>,
46    // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
47    // can still function after exhausted
48    schema: Arc<ArrowSchema>,
49    properties: PlanProperties,
50}
51
52impl OneShotExec {
53    /// Create a new instance from a given stream
54    pub fn new(stream: SendableRecordBatchStream) -> Self {
55        let schema = stream.schema();
56        Self {
57            stream: Mutex::new(Some(stream)),
58            schema: schema.clone(),
59            properties: PlanProperties::new(
60                EquivalenceProperties::new(schema),
61                Partitioning::RoundRobinBatch(1),
62                EmissionType::Incremental,
63                Boundedness::Bounded,
64            ),
65        }
66    }
67
68    pub fn from_batch(batch: RecordBatch) -> Self {
69        let schema = batch.schema();
70        let stream = Box::pin(RecordBatchStreamAdapter::new(
71            schema,
72            stream::iter(vec![Ok(batch)]),
73        ));
74        Self::new(stream)
75    }
76}
77
78impl std::fmt::Debug for OneShotExec {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        let stream = self.stream.lock().unwrap();
81        f.debug_struct("OneShotExec")
82            .field("exhausted", &stream.is_none())
83            .field("schema", self.schema.as_ref())
84            .finish()
85    }
86}
87
88impl DisplayAs for OneShotExec {
89    fn fmt_as(
90        &self,
91        t: datafusion::physical_plan::DisplayFormatType,
92        f: &mut std::fmt::Formatter,
93    ) -> std::fmt::Result {
94        let stream = self.stream.lock().unwrap();
95        match t {
96            DisplayFormatType::Default | DisplayFormatType::Verbose => {
97                let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
98                let columns = self
99                    .schema
100                    .field_names()
101                    .iter()
102                    .map(|s| s.to_string())
103                    .collect::<Vec<_>>();
104                write!(
105                    f,
106                    "OneShotStream: {}columns=[{}]",
107                    exhausted,
108                    columns.join(",")
109                )
110            }
111        }
112    }
113}
114
115impl ExecutionPlan for OneShotExec {
116    fn name(&self) -> &str {
117        "OneShotExec"
118    }
119
120    fn as_any(&self) -> &dyn std::any::Any {
121        self
122    }
123
124    fn schema(&self) -> arrow_schema::SchemaRef {
125        self.schema.clone()
126    }
127
128    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
129        vec![]
130    }
131
132    fn with_new_children(
133        self: Arc<Self>,
134        _children: Vec<Arc<dyn ExecutionPlan>>,
135    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
136        todo!()
137    }
138
139    fn execute(
140        &self,
141        _partition: usize,
142        _context: Arc<datafusion::execution::TaskContext>,
143    ) -> datafusion_common::Result<SendableRecordBatchStream> {
144        let stream = self
145            .stream
146            .lock()
147            .map_err(|err| DataFusionError::Execution(err.to_string()))?
148            .take();
149        if let Some(stream) = stream {
150            Ok(stream)
151        } else {
152            Err(DataFusionError::Execution(
153                "OneShotExec has already been executed".to_string(),
154            ))
155        }
156    }
157
158    fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
159        Ok(Statistics::new_unknown(&self.schema))
160    }
161
162    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
163        &self.properties
164    }
165}
166
167#[derive(Debug, Default, Clone)]
168pub struct LanceExecutionOptions {
169    pub use_spilling: bool,
170    pub mem_pool_size: Option<u64>,
171}
172
173const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
174
175impl LanceExecutionOptions {
176    pub fn mem_pool_size(&self) -> u64 {
177        self.mem_pool_size.unwrap_or_else(|| {
178            std::env::var("LANCE_MEM_POOL_SIZE")
179                .map(|s| match s.parse::<u64>() {
180                    Ok(v) => v,
181                    Err(e) => {
182                        warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
183                        DEFAULT_LANCE_MEM_POOL_SIZE
184                    }
185                })
186                .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
187        })
188    }
189
190    pub fn use_spilling(&self) -> bool {
191        if !self.use_spilling {
192            return false;
193        }
194        std::env::var("LANCE_BYPASS_SPILLING")
195            .map(|_| {
196                info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
197                false
198            })
199            .unwrap_or(true)
200    }
201}
202
203pub fn new_session_context(options: LanceExecutionOptions) -> SessionContext {
204    let session_config = SessionConfig::new();
205    let mut runtime_env_builder = RuntimeEnvBuilder::new();
206    if options.use_spilling() {
207        runtime_env_builder = runtime_env_builder
208            .with_disk_manager(DiskManagerConfig::new())
209            .with_memory_pool(Arc::new(FairSpillPool::new(
210                options.mem_pool_size() as usize
211            )));
212    }
213    let runtime_env = runtime_env_builder.build_arc().unwrap();
214    SessionContext::new_with_config_rt(session_config, runtime_env)
215}
216
217lazy_static! {
218    static ref DEFAULT_SESSION_CONTEXT: SessionContext =
219        new_session_context(LanceExecutionOptions::default());
220    static ref DEFAULT_SESSION_CONTEXT_WITH_SPILLING: SessionContext = {
221        new_session_context(LanceExecutionOptions {
222            use_spilling: true,
223            ..Default::default()
224        })
225    };
226}
227
228pub fn get_session_context(options: LanceExecutionOptions) -> SessionContext {
229    let session_ctx: SessionContext;
230    if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE {
231        if options.use_spilling() {
232            session_ctx = DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone();
233        } else {
234            session_ctx = DEFAULT_SESSION_CONTEXT.clone();
235        }
236    } else {
237        session_ctx = new_session_context(options)
238    }
239    session_ctx
240}
241
242/// Executes a plan using default session & runtime configuration
243///
244/// Only executes a single partition.  Panics if the plan has more than one partition.
245pub fn execute_plan(
246    plan: Arc<dyn ExecutionPlan>,
247    options: LanceExecutionOptions,
248) -> Result<SendableRecordBatchStream> {
249    debug!(
250        "Executing plan:\n{}",
251        DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
252    );
253
254    let session_ctx = get_session_context(options);
255
256    // NOTE: we are only executing the first partition here. Therefore, if
257    // the plan has more than one partition, we will be missing data.
258    assert_eq!(plan.properties().partitioning.partition_count(), 1);
259    Ok(plan.execute(0, session_ctx.task_ctx())?)
260}
261
262pub trait SessionContextExt {
263    /// Creates a DataFrame for reading a stream of data
264    ///
265    /// This dataframe may only be queried once, future queries will fail
266    fn read_one_shot(
267        &self,
268        data: SendableRecordBatchStream,
269    ) -> datafusion::common::Result<DataFrame>;
270}
271
272struct OneShotPartitionStream {
273    data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
274    schema: Arc<ArrowSchema>,
275}
276
277impl std::fmt::Debug for OneShotPartitionStream {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        let data = self.data.lock().unwrap();
280        f.debug_struct("OneShotPartitionStream")
281            .field("exhausted", &data.is_none())
282            .field("schema", self.schema.as_ref())
283            .finish()
284    }
285}
286
287impl OneShotPartitionStream {
288    fn new(data: SendableRecordBatchStream) -> Self {
289        let schema = data.schema();
290        Self {
291            data: Arc::new(Mutex::new(Some(data))),
292            schema,
293        }
294    }
295}
296
297impl PartitionStream for OneShotPartitionStream {
298    fn schema(&self) -> &arrow_schema::SchemaRef {
299        &self.schema
300    }
301
302    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
303        let mut stream = self.data.lock().unwrap();
304        stream
305            .take()
306            .expect("Attempt to consume a one shot dataframe multiple times")
307    }
308}
309
310impl SessionContextExt for SessionContext {
311    fn read_one_shot(
312        &self,
313        data: SendableRecordBatchStream,
314    ) -> datafusion::common::Result<DataFrame> {
315        let schema = data.schema();
316        let part_stream = Arc::new(OneShotPartitionStream::new(data));
317        let provider = StreamingTable::try_new(schema, vec![part_stream])?;
318        self.read_table(Arc::new(provider))
319    }
320}