1pub use async_channel::{TryRecvError, TrySendError};
21
22use crate::metrics::{
23 DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, UNBOUNDED_CHANNELS_SIZE,
24};
25use async_channel::{Receiver, Sender};
26use futures::{
27 stream::{FusedStream, Stream},
28 task::{Context, Poll},
29};
30use log::error;
31use sp_arithmetic::traits::SaturatedConversion;
32use std::{
33 backtrace::Backtrace,
34 pin::Pin,
35 sync::{
36 atomic::{AtomicBool, Ordering},
37 Arc,
38 },
39};
40
41#[derive(Debug)]
44pub struct TracingUnboundedSender<T> {
45 inner: Sender<T>,
46 name: &'static str,
47 queue_size_warning: usize,
48 warning_fired: Arc<AtomicBool>,
49 creation_backtrace: Arc<Backtrace>,
50}
51
52impl<T> Clone for TracingUnboundedSender<T> {
54 fn clone(&self) -> Self {
55 Self {
56 inner: self.inner.clone(),
57 name: self.name,
58 queue_size_warning: self.queue_size_warning,
59 warning_fired: self.warning_fired.clone(),
60 creation_backtrace: self.creation_backtrace.clone(),
61 }
62 }
63}
64
65#[derive(Debug)]
68pub struct TracingUnboundedReceiver<T> {
69 inner: Receiver<T>,
70 name: &'static str,
71}
72
73pub fn tracing_unbounded<T>(
77 name: &'static str,
78 queue_size_warning: usize,
79) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
80 let (s, r) = async_channel::unbounded();
81 let sender = TracingUnboundedSender {
82 inner: s,
83 name,
84 queue_size_warning,
85 warning_fired: Arc::new(AtomicBool::new(false)),
86 creation_backtrace: Arc::new(Backtrace::force_capture()),
87 };
88 let receiver = TracingUnboundedReceiver { inner: r, name: name.into() };
89 (sender, receiver)
90}
91
92impl<T> TracingUnboundedSender<T> {
93 pub fn is_closed(&self) -> bool {
95 self.inner.is_closed()
96 }
97
98 pub fn close(&self) -> bool {
100 self.inner.close()
101 }
102
103 pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
105 self.inner.try_send(msg).inspect(|_| {
106 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc();
107 UNBOUNDED_CHANNELS_SIZE
108 .with_label_values(&[self.name])
109 .set(self.inner.len().saturated_into());
110
111 if self.inner.len() >= self.queue_size_warning &&
112 self.warning_fired
113 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
114 .is_ok()
115 {
116 error!(
117 "The number of unprocessed messages in channel `{}` exceeded {}.\n\
118 The channel was created at:\n{}\n
119 Last message was sent from:\n{}",
120 self.name,
121 self.queue_size_warning,
122 self.creation_backtrace,
123 Backtrace::force_capture(),
124 );
125 }
126 })
127 }
128
129 pub fn len(&self) -> usize {
131 self.inner.len()
132 }
133}
134
135impl<T> TracingUnboundedReceiver<T> {
136 pub fn close(&mut self) -> bool {
138 self.inner.close()
139 }
140
141 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
144 self.inner.try_recv().inspect(|_| {
145 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc();
146 UNBOUNDED_CHANNELS_SIZE
147 .with_label_values(&[self.name])
148 .set(self.inner.len().saturated_into());
149 })
150 }
151
152 pub fn len(&self) -> usize {
154 self.inner.len()
155 }
156
157 pub fn name(&self) -> &'static str {
159 self.name
160 }
161}
162
163impl<T> Drop for TracingUnboundedReceiver<T> {
164 fn drop(&mut self) {
165 self.close();
167 let count = self.inner.len();
169 if count > 0 {
171 UNBOUNDED_CHANNELS_COUNTER
172 .with_label_values(&[self.name, DROPPED_LABEL])
173 .inc_by(count.saturated_into());
174 }
175 UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(0);
177 while let Ok(_) = self.inner.try_recv() {}
181 }
182}
183
184impl<T> Unpin for TracingUnboundedReceiver<T> {}
185
186impl<T> Stream for TracingUnboundedReceiver<T> {
187 type Item = T;
188
189 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
190 let s = self.get_mut();
191 match Pin::new(&mut s.inner).poll_next(cx) {
192 Poll::Ready(msg) => {
193 if msg.is_some() {
194 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc();
195 UNBOUNDED_CHANNELS_SIZE
196 .with_label_values(&[s.name])
197 .set(s.inner.len().saturated_into());
198 }
199 Poll::Ready(msg)
200 },
201 Poll::Pending => Poll::Pending,
202 }
203 }
204}
205
206impl<T> FusedStream for TracingUnboundedReceiver<T> {
207 fn is_terminated(&self) -> bool {
208 self.inner.is_terminated()
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::tracing_unbounded;
215 use async_channel::{self, RecvError, TryRecvError};
216
217 #[test]
218 fn test_tracing_unbounded_receiver_drop() {
219 let (tracing_unbounded_sender, tracing_unbounded_receiver) =
220 tracing_unbounded("test-receiver-drop", 10);
221 let (tx, rx) = async_channel::unbounded::<usize>();
222
223 tracing_unbounded_sender.unbounded_send(tx).unwrap();
224 drop(tracing_unbounded_receiver);
225
226 assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
227 assert_eq!(rx.recv_blocking(), Err(RecvError));
228 }
229}