lance_io/
stream.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use arrow_array::RecordBatch;
8use arrow_schema::{ArrowError, SchemaRef};
9use futures::stream::BoxStream;
10use futures::{Stream, StreamExt};
11use pin_project::pin_project;
12
13use lance_core::Result;
14
15pub type BatchStream = BoxStream<'static, Result<RecordBatch>>;
16
17pub fn arrow_stream_to_lance_stream(
18    arrow_stream: BoxStream<'static, std::result::Result<RecordBatch, ArrowError>>,
19) -> BatchStream {
20    arrow_stream.map(|r| r.map_err(Into::into)).boxed()
21}
22
23/// RecordBatch Stream trait.
24pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> + Send {
25    /// Returns the schema of the stream.
26    fn schema(&self) -> SchemaRef;
27}
28
29/// Combines a [`Stream`] with a [`SchemaRef`] implementing
30/// [`RecordBatchStream`] for the combination
31#[pin_project]
32pub struct RecordBatchStreamAdapter<S> {
33    schema: SchemaRef,
34
35    #[pin]
36    stream: S,
37}
38
39impl<S> RecordBatchStreamAdapter<S> {
40    /// Creates a new [`RecordBatchStreamAdapter`] from the provided schema and stream
41    pub fn new(schema: SchemaRef, stream: S) -> Self {
42        Self { schema, stream }
43    }
44}
45
46impl<S> std::fmt::Debug for RecordBatchStreamAdapter<S> {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("RecordBatchStreamAdapter")
49            .field("schema", &self.schema)
50            .finish()
51    }
52}
53
54impl<S> RecordBatchStream for RecordBatchStreamAdapter<S>
55where
56    S: Stream<Item = Result<RecordBatch>> + Send + 'static,
57{
58    fn schema(&self) -> SchemaRef {
59        self.schema.clone()
60    }
61}
62
63impl<S> Stream for RecordBatchStreamAdapter<S>
64where
65    S: Stream<Item = Result<RecordBatch>>,
66{
67    type Item = Result<RecordBatch>;
68
69    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        self.project().stream.poll_next(cx)
71    }
72
73    fn size_hint(&self) -> (usize, Option<usize>) {
74        self.stream.size_hint()
75    }
76}