lance_core/utils/
tracing.rs1use futures::Stream;
5use pin_project::pin_project;
6use tracing::Span;
7
8#[pin_project]
9pub struct InstrumentedStream<I: Stream> {
10 #[pin]
11 stream: I,
12 span: Span,
13}
14
15impl<I: Stream> Stream for InstrumentedStream<I> {
16 type Item = I::Item;
17
18 fn poll_next(
19 self: std::pin::Pin<&mut Self>,
20 cx: &mut std::task::Context<'_>,
21 ) -> std::task::Poll<Option<Self::Item>> {
22 let this = self.project();
23 let _guard = this.span.enter();
24 this.stream.poll_next(cx)
25 }
26}
27
28pub trait StreamTracingExt {
31 fn stream_in_current_span(self) -> InstrumentedStream<Self>
33 where
34 Self: Stream,
35 Self: Sized;
36}
37
38impl<S: Stream> StreamTracingExt for S {
39 fn stream_in_current_span(self) -> InstrumentedStream<Self>
40 where
41 Self: Stream,
42 Self: Sized,
43 {
44 InstrumentedStream {
45 stream: self,
46 span: Span::current(),
47 }
48 }
49}