1#[cfg(not(feature = "metered"))]
21mod inner {
22 use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
24 pub type TracingUnboundedSender<T> = UnboundedSender<T>;
25 pub type TracingUnboundedReceiver<T> = UnboundedReceiver<T>;
26
27 pub fn tracing_unbounded<T>(_key: &'static str) ->(TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
29 mpsc::unbounded()
30 }
31}
32
33
34#[cfg(feature = "metered")]
35mod inner {
36 use futures::channel::mpsc::{self,
38 UnboundedReceiver, UnboundedSender,
39 TryRecvError, TrySendError, SendError
40 };
41 use futures::{sink::Sink, task::{Poll, Context}, stream::{Stream, FusedStream}};
42 use std::pin::Pin;
43 use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
44
45 #[derive(Debug)]
48 pub struct TracingUnboundedSender<T>(&'static str, UnboundedSender<T>);
49
50 impl<T> Clone for TracingUnboundedSender<T> {
52 fn clone(&self) -> Self {
53 Self(self.0, self.1.clone())
54 }
55 }
56
57 #[derive(Debug)]
60 pub struct TracingUnboundedReceiver<T>(&'static str, UnboundedReceiver<T>);
61
62 pub fn tracing_unbounded<T>(key: &'static str) ->(TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
65 let (s, r) = mpsc::unbounded();
66 (TracingUnboundedSender(key, s), TracingUnboundedReceiver(key,r))
67 }
68
69 impl<T> TracingUnboundedSender<T> {
70 pub fn poll_ready(&self, ctx: &mut Context) -> Poll<Result<(), SendError>> {
72 self.1.poll_ready(ctx)
73 }
74
75 pub fn is_closed(&self) -> bool {
77 self.1.is_closed()
78 }
79
80 pub fn close_channel(&self) {
82 self.1.close_channel()
83 }
84
85 pub fn disconnect(&mut self) {
87 self.1.disconnect()
88 }
89
90 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
92 self.1.start_send(msg)
93 }
94
95 pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
97 self.1.unbounded_send(msg).map(|s|{
98 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"send"]).inc();
99 s
100 })
101 }
102
103 pub fn same_receiver(&self, other: &UnboundedSender<T>) -> bool {
105 self.1.same_receiver(other)
106 }
107 }
108
109 impl<T> TracingUnboundedReceiver<T> {
110
111 fn consume(&mut self) {
112 let mut count = 0;
114 loop {
115 if self.1.is_terminated() {
116 break;
117 }
118
119 match self.try_next() {
120 Ok(Some(..)) => count += 1,
121 _ => break
122 }
123 }
124 if count > 0 {
126 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"dropped"]).inc_by(count);
127 }
128
129 }
130
131 pub fn close(&mut self) {
134 self.consume();
135 self.1.close()
136 }
137
138 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
141 self.1.try_next().map(|s| {
142 if s.is_some() {
143 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, &"received"]).inc();
144 }
145 s
146 })
147 }
148 }
149
150 impl<T> Drop for TracingUnboundedReceiver<T> {
151 fn drop(&mut self) {
152 self.consume();
153 }
154 }
155
156 impl<T> Unpin for TracingUnboundedReceiver<T> {}
157
158 impl<T> Stream for TracingUnboundedReceiver<T> {
159 type Item = T;
160
161 fn poll_next(
162 self: Pin<&mut Self>,
163 cx: &mut Context<'_>,
164 ) -> Poll<Option<T>> {
165 let s = self.get_mut();
166 match Pin::new(&mut s.1).poll_next(cx) {
167 Poll::Ready(msg) => {
168 if msg.is_some() {
169 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.0, "received"]).inc();
170 }
171 Poll::Ready(msg)
172 }
173 Poll::Pending => {
174 Poll::Pending
175 }
176 }
177 }
178 }
179
180 impl<T> FusedStream for TracingUnboundedReceiver<T> {
181 fn is_terminated(&self) -> bool {
182 self.1.is_terminated()
183 }
184 }
185
186 impl<T> Sink<T> for TracingUnboundedSender<T> {
187 type Error = SendError;
188
189 fn poll_ready(
190 self: Pin<&mut Self>,
191 cx: &mut Context<'_>,
192 ) -> Poll<Result<(), Self::Error>> {
193 TracingUnboundedSender::poll_ready(&*self, cx)
194 }
195
196 fn start_send(
197 mut self: Pin<&mut Self>,
198 msg: T,
199 ) -> Result<(), Self::Error> {
200 TracingUnboundedSender::start_send(&mut *self, msg)
201 }
202
203 fn poll_flush(
204 self: Pin<&mut Self>,
205 _: &mut Context<'_>,
206 ) -> Poll<Result<(), Self::Error>> {
207 Poll::Ready(Ok(()))
208 }
209
210 fn poll_close(
211 mut self: Pin<&mut Self>,
212 _: &mut Context<'_>,
213 ) -> Poll<Result<(), Self::Error>> {
214 self.disconnect();
215 Poll::Ready(Ok(()))
216 }
217 }
218
219 impl<T> Sink<T> for &TracingUnboundedSender<T> {
220 type Error = SendError;
221
222 fn poll_ready(
223 self: Pin<&mut Self>,
224 cx: &mut Context<'_>,
225 ) -> Poll<Result<(), Self::Error>> {
226 TracingUnboundedSender::poll_ready(*self, cx)
227 }
228
229 fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
230 self.unbounded_send(msg)
231 .map_err(TrySendError::into_send_error)
232 }
233
234 fn poll_flush(
235 self: Pin<&mut Self>,
236 _: &mut Context<'_>,
237 ) -> Poll<Result<(), Self::Error>> {
238 Poll::Ready(Ok(()))
239 }
240
241 fn poll_close(
242 self: Pin<&mut Self>,
243 _: &mut Context<'_>,
244 ) -> Poll<Result<(), Self::Error>> {
245 self.close_channel();
246 Poll::Ready(Ok(()))
247 }
248 }
249}
250
251pub use inner::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};