lance_datafusion/
utils.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow::ffi_stream::ArrowArrayStreamReader;
5use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
6use arrow_schema::{ArrowError, SchemaRef};
7use async_trait::async_trait;
8use datafusion::{
9    execution::RecordBatchStream,
10    physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream},
11};
12use datafusion_common::DataFusionError;
13use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
14use lance_core::datatypes::Schema;
15use lance_core::Result;
16use tokio::task::{spawn, spawn_blocking};
17
18fn background_iterator<I: Iterator + Send + 'static>(iter: I) -> impl Stream<Item = I::Item>
19where
20    I::Item: Send,
21{
22    stream::unfold(iter, |mut iter| {
23        spawn_blocking(|| iter.next().map(|val| (val, iter)))
24            .unwrap_or_else(|err| panic!("{}", err))
25    })
26    .fuse()
27}
28
29/// A trait for [BatchRecord] iterators, readers and streams
30/// that can be converted to a concrete stream type [SendableRecordBatchStream].
31///
32/// This also cam read the schema from the first batch
33/// and then update the schema to reflect the dictionary columns.
34#[async_trait]
35pub trait StreamingWriteSource: Send {
36    /// Infer the Lance schema from the first batch stream.
37    ///
38    /// This will peek the first batch to get the dictionaries for dictionary columns.
39    ///
40    /// NOTE: this does not validate the schema. For example, for appends the schema
41    /// should be checked to make sure it matches the existing dataset schema before
42    /// writing.
43    async fn into_stream_and_schema(self) -> Result<(SendableRecordBatchStream, Schema)>
44    where
45        Self: Sized,
46    {
47        let mut stream = self.into_stream();
48        let (stream, arrow_schema, schema) = spawn(async move {
49            let arrow_schema = stream.schema();
50            let mut schema: Schema = Schema::try_from(arrow_schema.as_ref())?;
51            let first_batch = stream.try_next().await?;
52            if let Some(batch) = &first_batch {
53                schema.set_dictionary(batch)?;
54            }
55            let stream = stream::iter(first_batch.map(Ok)).chain(stream);
56            Result::Ok((stream, arrow_schema, schema))
57        })
58        .await
59        .unwrap()?;
60        schema.validate()?;
61        let adapter = RecordBatchStreamAdapter::new(arrow_schema, stream);
62        Ok((Box::pin(adapter), schema))
63    }
64
65    /// Returns the arrow schema.
66    fn arrow_schema(&self) -> SchemaRef;
67
68    /// Convert to a stream.
69    ///
70    /// The conversion will be conducted in a background thread.
71    fn into_stream(self) -> SendableRecordBatchStream;
72}
73
74impl StreamingWriteSource for ArrowArrayStreamReader {
75    #[inline]
76    fn arrow_schema(&self) -> SchemaRef {
77        RecordBatchReader::schema(self)
78    }
79
80    #[inline]
81    fn into_stream(self) -> SendableRecordBatchStream {
82        reader_to_stream(Box::new(self))
83    }
84}
85
86impl<I> StreamingWriteSource for RecordBatchIterator<I>
87where
88    Self: Send,
89    I: IntoIterator<Item = ::core::result::Result<RecordBatch, ArrowError>> + Send + 'static,
90{
91    #[inline]
92    fn arrow_schema(&self) -> SchemaRef {
93        RecordBatchReader::schema(self)
94    }
95
96    #[inline]
97    fn into_stream(self) -> SendableRecordBatchStream {
98        reader_to_stream(Box::new(self))
99    }
100}
101
102impl<T> StreamingWriteSource for Box<T>
103where
104    T: StreamingWriteSource,
105{
106    #[inline]
107    fn arrow_schema(&self) -> SchemaRef {
108        T::arrow_schema(&**self)
109    }
110
111    #[inline]
112    fn into_stream(self) -> SendableRecordBatchStream {
113        T::into_stream(*self)
114    }
115}
116
117impl StreamingWriteSource for Box<dyn RecordBatchReader + Send> {
118    #[inline]
119    fn arrow_schema(&self) -> SchemaRef {
120        RecordBatchReader::schema(self)
121    }
122
123    #[inline]
124    fn into_stream(self) -> SendableRecordBatchStream {
125        reader_to_stream(self)
126    }
127}
128
129impl StreamingWriteSource for SendableRecordBatchStream {
130    #[inline]
131    fn arrow_schema(&self) -> SchemaRef {
132        RecordBatchStream::schema(&**self)
133    }
134
135    #[inline]
136    fn into_stream(self) -> SendableRecordBatchStream {
137        self
138    }
139}
140
141/// Convert reader to a stream.
142///
143/// The reader will be called in a background thread.
144pub fn reader_to_stream(batches: Box<dyn RecordBatchReader + Send>) -> SendableRecordBatchStream {
145    let arrow_schema = batches.arrow_schema();
146    let stream = RecordBatchStreamAdapter::new(
147        arrow_schema,
148        background_iterator(batches).map_err(DataFusionError::from),
149    );
150    Box::pin(stream)
151}