lance_core/utils/
tracing.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use futures::Stream;
5use pin_project::pin_project;
6use tracing::Span;
7
8#[pin_project]
9pub struct InstrumentedStream<I: Stream> {
10    #[pin]
11    stream: I,
12    span: Span,
13}
14
15impl<I: Stream> Stream for InstrumentedStream<I> {
16    type Item = I::Item;
17
18    fn poll_next(
19        self: std::pin::Pin<&mut Self>,
20        cx: &mut std::task::Context<'_>,
21    ) -> std::task::Poll<Option<Self::Item>> {
22        let this = self.project();
23        let _guard = this.span.enter();
24        this.stream.poll_next(cx)
25    }
26}
27
28// It would be nice to call the method in_current_span but sadly the Instrumented trait in
29// the tracing crate already stole the name for all Sized types
30pub trait StreamTracingExt {
31    /// All calls to poll the stream will be done in the context of the current span (when this method is called)
32    fn stream_in_current_span(self) -> InstrumentedStream<Self>
33    where
34        Self: Stream,
35        Self: Sized;
36}
37
38impl<S: Stream> StreamTracingExt for S {
39    fn stream_in_current_span(self) -> InstrumentedStream<Self>
40    where
41        Self: Stream,
42        Self: Sized,
43    {
44        InstrumentedStream {
45            stream: self,
46            span: Span::current(),
47        }
48    }
49}