lance_datafusion/
utils.rs1use arrow::ffi_stream::ArrowArrayStreamReader;
5use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
6use arrow_schema::{ArrowError, SchemaRef};
7use async_trait::async_trait;
8use datafusion::{
9 execution::RecordBatchStream,
10 physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream},
11};
12use datafusion_common::DataFusionError;
13use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
14use lance_core::datatypes::Schema;
15use lance_core::Result;
16use tokio::task::{spawn, spawn_blocking};
17
18fn background_iterator<I: Iterator + Send + 'static>(iter: I) -> impl Stream<Item = I::Item>
19where
20 I::Item: Send,
21{
22 stream::unfold(iter, |mut iter| {
23 spawn_blocking(|| iter.next().map(|val| (val, iter)))
24 .unwrap_or_else(|err| panic!("{}", err))
25 })
26 .fuse()
27}
28
29#[async_trait]
35pub trait StreamingWriteSource: Send {
36 async fn into_stream_and_schema(self) -> Result<(SendableRecordBatchStream, Schema)>
44 where
45 Self: Sized,
46 {
47 let mut stream = self.into_stream();
48 let (stream, arrow_schema, schema) = spawn(async move {
49 let arrow_schema = stream.schema();
50 let mut schema: Schema = Schema::try_from(arrow_schema.as_ref())?;
51 let first_batch = stream.try_next().await?;
52 if let Some(batch) = &first_batch {
53 schema.set_dictionary(batch)?;
54 }
55 let stream = stream::iter(first_batch.map(Ok)).chain(stream);
56 Result::Ok((stream, arrow_schema, schema))
57 })
58 .await
59 .unwrap()?;
60 schema.validate()?;
61 let adapter = RecordBatchStreamAdapter::new(arrow_schema, stream);
62 Ok((Box::pin(adapter), schema))
63 }
64
65 fn arrow_schema(&self) -> SchemaRef;
67
68 fn into_stream(self) -> SendableRecordBatchStream;
72}
73
74impl StreamingWriteSource for ArrowArrayStreamReader {
75 #[inline]
76 fn arrow_schema(&self) -> SchemaRef {
77 RecordBatchReader::schema(self)
78 }
79
80 #[inline]
81 fn into_stream(self) -> SendableRecordBatchStream {
82 reader_to_stream(Box::new(self))
83 }
84}
85
86impl<I> StreamingWriteSource for RecordBatchIterator<I>
87where
88 Self: Send,
89 I: IntoIterator<Item = ::core::result::Result<RecordBatch, ArrowError>> + Send + 'static,
90{
91 #[inline]
92 fn arrow_schema(&self) -> SchemaRef {
93 RecordBatchReader::schema(self)
94 }
95
96 #[inline]
97 fn into_stream(self) -> SendableRecordBatchStream {
98 reader_to_stream(Box::new(self))
99 }
100}
101
102impl<T> StreamingWriteSource for Box<T>
103where
104 T: StreamingWriteSource,
105{
106 #[inline]
107 fn arrow_schema(&self) -> SchemaRef {
108 T::arrow_schema(&**self)
109 }
110
111 #[inline]
112 fn into_stream(self) -> SendableRecordBatchStream {
113 T::into_stream(*self)
114 }
115}
116
117impl StreamingWriteSource for Box<dyn RecordBatchReader + Send> {
118 #[inline]
119 fn arrow_schema(&self) -> SchemaRef {
120 RecordBatchReader::schema(self)
121 }
122
123 #[inline]
124 fn into_stream(self) -> SendableRecordBatchStream {
125 reader_to_stream(self)
126 }
127}
128
129impl StreamingWriteSource for SendableRecordBatchStream {
130 #[inline]
131 fn arrow_schema(&self) -> SchemaRef {
132 RecordBatchStream::schema(&**self)
133 }
134
135 #[inline]
136 fn into_stream(self) -> SendableRecordBatchStream {
137 self
138 }
139}
140
141pub fn reader_to_stream(batches: Box<dyn RecordBatchReader + Send>) -> SendableRecordBatchStream {
145 let arrow_schema = batches.arrow_schema();
146 let stream = RecordBatchStreamAdapter::new(
147 arrow_schema,
148 background_iterator(batches).map_err(DataFusionError::from),
149 );
150 Box::pin(stream)
151}