1use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use arrow_array::RecordBatch;
8use arrow_schema::{ArrowError, SchemaRef};
9use futures::stream::BoxStream;
10use futures::{Stream, StreamExt};
11use pin_project::pin_project;
12
13use lance_core::Result;
14
15pub type BatchStream = BoxStream<'static, Result<RecordBatch>>;
16
17pub fn arrow_stream_to_lance_stream(
18 arrow_stream: BoxStream<'static, std::result::Result<RecordBatch, ArrowError>>,
19) -> BatchStream {
20 arrow_stream.map(|r| r.map_err(Into::into)).boxed()
21}
22
23pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> + Send {
25 fn schema(&self) -> SchemaRef;
27}
28
29#[pin_project]
32pub struct RecordBatchStreamAdapter<S> {
33 schema: SchemaRef,
34
35 #[pin]
36 stream: S,
37}
38
39impl<S> RecordBatchStreamAdapter<S> {
40 pub fn new(schema: SchemaRef, stream: S) -> Self {
42 Self { schema, stream }
43 }
44}
45
46impl<S> std::fmt::Debug for RecordBatchStreamAdapter<S> {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("RecordBatchStreamAdapter")
49 .field("schema", &self.schema)
50 .finish()
51 }
52}
53
54impl<S> RecordBatchStream for RecordBatchStreamAdapter<S>
55where
56 S: Stream<Item = Result<RecordBatch>> + Send + 'static,
57{
58 fn schema(&self) -> SchemaRef {
59 self.schema.clone()
60 }
61}
62
63impl<S> Stream for RecordBatchStreamAdapter<S>
64where
65 S: Stream<Item = Result<RecordBatch>>,
66{
67 type Item = Result<RecordBatch>;
68
69 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70 self.project().stream.poll_next(cx)
71 }
72
73 fn size_hint(&self) -> (usize, Option<usize>) {
74 self.stream.size_hint()
75 }
76}