lance_datafusion/
utils.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use arrow_array::{RecordBatchIterator, RecordBatchReader};
use datafusion::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
use datafusion_common::DataFusionError;
use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
use lance_core::datatypes::Schema;
use lance_core::{Error, Result};
use tokio::task::spawn_blocking;

fn background_iterator<I: Iterator + Send + 'static>(iter: I) -> impl Stream<Item = I::Item>
where
    I::Item: Send,
{
    stream::unfold(iter, |mut iter| {
        spawn_blocking(|| iter.next().map(|val| (val, iter)))
            .unwrap_or_else(|err| panic!("{}", err))
    })
    .fuse()
}

/// Infer the Lance schema from the first batch.
///
/// This will peek the first batch to get the dictionaries for dictionary columns.
///
/// NOTE: this does not validate the schema. For example, for appends the schema
/// should be checked to make sure it matches the existing dataset schema before
/// writing.
pub async fn peek_reader_schema(
    batches: Box<dyn RecordBatchReader + Send>,
) -> Result<(Box<dyn RecordBatchReader + Send>, Schema)> {
    let arrow_schema = batches.schema();
    let (peekable, schema) = spawn_blocking(move || {
        let mut schema: Schema = Schema::try_from(batches.schema().as_ref())?;
        let mut peekable = batches.peekable();
        if let Some(batch) = peekable.peek() {
            if let Ok(b) = batch {
                schema.set_dictionary(b)?;
            } else {
                return Err(Error::from(batch.as_ref().unwrap_err()));
            }
        }
        Ok((peekable, schema))
    })
    .await
    .unwrap()?;
    schema.validate()?;
    let reader = RecordBatchIterator::new(peekable, arrow_schema);
    Ok((
        Box::new(reader) as Box<dyn RecordBatchReader + Send>,
        schema,
    ))
}

/// Convert reader to a stream.
///
/// The reader will be called in a background thread.
pub fn reader_to_stream(batches: Box<dyn RecordBatchReader + Send>) -> SendableRecordBatchStream {
    let arrow_schema = batches.schema();
    let stream = RecordBatchStreamAdapter::new(
        arrow_schema,
        background_iterator(batches).map_err(DataFusionError::from),
    );
    Box::pin(stream)
}