1use std::sync::{Arc, Mutex};
7
8use arrow_array::RecordBatch;
9use arrow_schema::Schema as ArrowSchema;
10use datafusion::{
11 catalog::streaming::StreamingTable,
12 dataframe::DataFrame,
13 execution::{
14 context::{SessionConfig, SessionContext},
15 disk_manager::DiskManagerConfig,
16 memory_pool::FairSpillPool,
17 runtime_env::RuntimeEnvBuilder,
18 TaskContext,
19 },
20 physical_plan::{
21 display::DisplayableExecutionPlan,
22 execution_plan::{Boundedness, EmissionType},
23 stream::RecordBatchStreamAdapter,
24 streaming::PartitionStream,
25 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
26 },
27};
28use datafusion_common::{DataFusionError, Statistics};
29use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
30use lazy_static::lazy_static;
31
32use futures::stream;
33use lance_arrow::SchemaExt;
34use lance_core::Result;
35use log::{debug, info, warn};
36
37pub struct OneShotExec {
45 stream: Mutex<Option<SendableRecordBatchStream>>,
46 schema: Arc<ArrowSchema>,
49 properties: PlanProperties,
50}
51
52impl OneShotExec {
53 pub fn new(stream: SendableRecordBatchStream) -> Self {
55 let schema = stream.schema();
56 Self {
57 stream: Mutex::new(Some(stream)),
58 schema: schema.clone(),
59 properties: PlanProperties::new(
60 EquivalenceProperties::new(schema),
61 Partitioning::RoundRobinBatch(1),
62 EmissionType::Incremental,
63 Boundedness::Bounded,
64 ),
65 }
66 }
67
68 pub fn from_batch(batch: RecordBatch) -> Self {
69 let schema = batch.schema();
70 let stream = Box::pin(RecordBatchStreamAdapter::new(
71 schema,
72 stream::iter(vec![Ok(batch)]),
73 ));
74 Self::new(stream)
75 }
76}
77
78impl std::fmt::Debug for OneShotExec {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 let stream = self.stream.lock().unwrap();
81 f.debug_struct("OneShotExec")
82 .field("exhausted", &stream.is_none())
83 .field("schema", self.schema.as_ref())
84 .finish()
85 }
86}
87
88impl DisplayAs for OneShotExec {
89 fn fmt_as(
90 &self,
91 t: datafusion::physical_plan::DisplayFormatType,
92 f: &mut std::fmt::Formatter,
93 ) -> std::fmt::Result {
94 let stream = self.stream.lock().unwrap();
95 match t {
96 DisplayFormatType::Default | DisplayFormatType::Verbose => {
97 let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
98 let columns = self
99 .schema
100 .field_names()
101 .iter()
102 .map(|s| s.to_string())
103 .collect::<Vec<_>>();
104 write!(
105 f,
106 "OneShotStream: {}columns=[{}]",
107 exhausted,
108 columns.join(",")
109 )
110 }
111 }
112 }
113}
114
115impl ExecutionPlan for OneShotExec {
116 fn name(&self) -> &str {
117 "OneShotExec"
118 }
119
120 fn as_any(&self) -> &dyn std::any::Any {
121 self
122 }
123
124 fn schema(&self) -> arrow_schema::SchemaRef {
125 self.schema.clone()
126 }
127
128 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
129 vec![]
130 }
131
132 fn with_new_children(
133 self: Arc<Self>,
134 _children: Vec<Arc<dyn ExecutionPlan>>,
135 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
136 todo!()
137 }
138
139 fn execute(
140 &self,
141 _partition: usize,
142 _context: Arc<datafusion::execution::TaskContext>,
143 ) -> datafusion_common::Result<SendableRecordBatchStream> {
144 let stream = self
145 .stream
146 .lock()
147 .map_err(|err| DataFusionError::Execution(err.to_string()))?
148 .take();
149 if let Some(stream) = stream {
150 Ok(stream)
151 } else {
152 Err(DataFusionError::Execution(
153 "OneShotExec has already been executed".to_string(),
154 ))
155 }
156 }
157
158 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
159 Ok(Statistics::new_unknown(&self.schema))
160 }
161
162 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
163 &self.properties
164 }
165}
166
167#[derive(Debug, Default, Clone)]
168pub struct LanceExecutionOptions {
169 pub use_spilling: bool,
170 pub mem_pool_size: Option<u64>,
171}
172
173const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
174
175impl LanceExecutionOptions {
176 pub fn mem_pool_size(&self) -> u64 {
177 self.mem_pool_size.unwrap_or_else(|| {
178 std::env::var("LANCE_MEM_POOL_SIZE")
179 .map(|s| match s.parse::<u64>() {
180 Ok(v) => v,
181 Err(e) => {
182 warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
183 DEFAULT_LANCE_MEM_POOL_SIZE
184 }
185 })
186 .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
187 })
188 }
189
190 pub fn use_spilling(&self) -> bool {
191 if !self.use_spilling {
192 return false;
193 }
194 std::env::var("LANCE_BYPASS_SPILLING")
195 .map(|_| {
196 info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
197 false
198 })
199 .unwrap_or(true)
200 }
201}
202
203pub fn new_session_context(options: LanceExecutionOptions) -> SessionContext {
204 let session_config = SessionConfig::new();
205 let mut runtime_env_builder = RuntimeEnvBuilder::new();
206 if options.use_spilling() {
207 runtime_env_builder = runtime_env_builder
208 .with_disk_manager(DiskManagerConfig::new())
209 .with_memory_pool(Arc::new(FairSpillPool::new(
210 options.mem_pool_size() as usize
211 )));
212 }
213 let runtime_env = runtime_env_builder.build_arc().unwrap();
214 SessionContext::new_with_config_rt(session_config, runtime_env)
215}
216
217lazy_static! {
218 static ref DEFAULT_SESSION_CONTEXT: SessionContext =
219 new_session_context(LanceExecutionOptions::default());
220 static ref DEFAULT_SESSION_CONTEXT_WITH_SPILLING: SessionContext = {
221 new_session_context(LanceExecutionOptions {
222 use_spilling: true,
223 ..Default::default()
224 })
225 };
226}
227
228pub fn get_session_context(options: LanceExecutionOptions) -> SessionContext {
229 let session_ctx: SessionContext;
230 if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE {
231 if options.use_spilling() {
232 session_ctx = DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone();
233 } else {
234 session_ctx = DEFAULT_SESSION_CONTEXT.clone();
235 }
236 } else {
237 session_ctx = new_session_context(options)
238 }
239 session_ctx
240}
241
242pub fn execute_plan(
246 plan: Arc<dyn ExecutionPlan>,
247 options: LanceExecutionOptions,
248) -> Result<SendableRecordBatchStream> {
249 debug!(
250 "Executing plan:\n{}",
251 DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
252 );
253
254 let session_ctx = get_session_context(options);
255
256 assert_eq!(plan.properties().partitioning.partition_count(), 1);
259 Ok(plan.execute(0, session_ctx.task_ctx())?)
260}
261
262pub trait SessionContextExt {
263 fn read_one_shot(
267 &self,
268 data: SendableRecordBatchStream,
269 ) -> datafusion::common::Result<DataFrame>;
270}
271
272struct OneShotPartitionStream {
273 data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
274 schema: Arc<ArrowSchema>,
275}
276
277impl std::fmt::Debug for OneShotPartitionStream {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 let data = self.data.lock().unwrap();
280 f.debug_struct("OneShotPartitionStream")
281 .field("exhausted", &data.is_none())
282 .field("schema", self.schema.as_ref())
283 .finish()
284 }
285}
286
287impl OneShotPartitionStream {
288 fn new(data: SendableRecordBatchStream) -> Self {
289 let schema = data.schema();
290 Self {
291 data: Arc::new(Mutex::new(Some(data))),
292 schema,
293 }
294 }
295}
296
297impl PartitionStream for OneShotPartitionStream {
298 fn schema(&self) -> &arrow_schema::SchemaRef {
299 &self.schema
300 }
301
302 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
303 let mut stream = self.data.lock().unwrap();
304 stream
305 .take()
306 .expect("Attempt to consume a one shot dataframe multiple times")
307 }
308}
309
310impl SessionContextExt for SessionContext {
311 fn read_one_shot(
312 &self,
313 data: SendableRecordBatchStream,
314 ) -> datafusion::common::Result<DataFrame> {
315 let schema = data.schema();
316 let part_stream = Arc::new(OneShotPartitionStream::new(data));
317 let provider = StreamingTable::try_new(schema, vec![part_stream])?;
318 self.read_table(Arc::new(provider))
319 }
320}