1pub use async_channel::{TryRecvError, TrySendError};
22
23use crate::metrics::{
24 DROPPED_LABEL, RECEIVED_LABEL, SENT_LABEL, UNBOUNDED_CHANNELS_COUNTER, UNBOUNDED_CHANNELS_SIZE,
25};
26use async_channel::{Receiver, Sender};
27use futures::{
28 stream::{FusedStream, Stream},
29 task::{Context, Poll},
30};
31use log::error;
32use sp_arithmetic::traits::SaturatedConversion;
33use std::{
34 backtrace::Backtrace,
35 pin::Pin,
36 sync::{
37 atomic::{AtomicBool, Ordering},
38 Arc,
39 },
40};
41
42#[derive(Debug)]
45pub struct TracingUnboundedSender<T> {
46 inner: Sender<T>,
47 name: &'static str,
48 queue_size_warning: usize,
49 warning_fired: Arc<AtomicBool>,
50 creation_backtrace: Arc<Backtrace>,
51}
52
53impl<T> Clone for TracingUnboundedSender<T> {
55 fn clone(&self) -> Self {
56 Self {
57 inner: self.inner.clone(),
58 name: self.name,
59 queue_size_warning: self.queue_size_warning,
60 warning_fired: self.warning_fired.clone(),
61 creation_backtrace: self.creation_backtrace.clone(),
62 }
63 }
64}
65
66#[derive(Debug)]
69pub struct TracingUnboundedReceiver<T> {
70 inner: Receiver<T>,
71 name: &'static str,
72}
73
74pub fn tracing_unbounded<T>(
78 name: &'static str,
79 queue_size_warning: usize,
80) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
81 let (s, r) = async_channel::unbounded();
82 let sender = TracingUnboundedSender {
83 inner: s,
84 name,
85 queue_size_warning,
86 warning_fired: Arc::new(AtomicBool::new(false)),
87 creation_backtrace: Arc::new(Backtrace::force_capture()),
88 };
89 let receiver = TracingUnboundedReceiver { inner: r, name: name.into() };
90 (sender, receiver)
91}
92
93impl<T> TracingUnboundedSender<T> {
94 pub fn is_closed(&self) -> bool {
96 self.inner.is_closed()
97 }
98
99 pub fn close(&self) -> bool {
101 self.inner.close()
102 }
103
104 pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
106 self.inner.try_send(msg).inspect(|_| {
107 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, SENT_LABEL]).inc();
108 UNBOUNDED_CHANNELS_SIZE
109 .with_label_values(&[self.name])
110 .set(self.inner.len().saturated_into());
111
112 if self.inner.len() >= self.queue_size_warning &&
113 self.warning_fired
114 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
115 .is_ok()
116 {
117 error!(
118 "The number of unprocessed messages in channel `{}` exceeded {}.\n\
119 The channel was created at:\n{}\n
120 Last message was sent from:\n{}",
121 self.name,
122 self.queue_size_warning,
123 self.creation_backtrace,
124 Backtrace::force_capture(),
125 );
126 }
127 })
128 }
129
130 pub fn len(&self) -> usize {
132 self.inner.len()
133 }
134}
135
136impl<T> TracingUnboundedReceiver<T> {
137 pub fn close(&mut self) -> bool {
139 self.inner.close()
140 }
141
142 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
145 self.inner.try_recv().inspect(|_| {
146 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, RECEIVED_LABEL]).inc();
147 UNBOUNDED_CHANNELS_SIZE
148 .with_label_values(&[self.name])
149 .set(self.inner.len().saturated_into());
150 })
151 }
152
153 pub fn len(&self) -> usize {
155 self.inner.len()
156 }
157
158 pub fn name(&self) -> &'static str {
160 self.name
161 }
162}
163
164impl<T> Drop for TracingUnboundedReceiver<T> {
165 fn drop(&mut self) {
166 self.close();
168 let count = self.inner.len();
170 if count > 0 {
172 UNBOUNDED_CHANNELS_COUNTER
173 .with_label_values(&[self.name, DROPPED_LABEL])
174 .inc_by(count.saturated_into());
175 }
176 UNBOUNDED_CHANNELS_SIZE.with_label_values(&[self.name]).set(0);
178 while let Ok(_) = self.inner.try_recv() {}
182 }
183}
184
185impl<T> Unpin for TracingUnboundedReceiver<T> {}
186
187impl<T> Stream for TracingUnboundedReceiver<T> {
188 type Item = T;
189
190 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
191 let s = self.get_mut();
192 match Pin::new(&mut s.inner).poll_next(cx) {
193 Poll::Ready(msg) => {
194 if msg.is_some() {
195 UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, RECEIVED_LABEL]).inc();
196 UNBOUNDED_CHANNELS_SIZE
197 .with_label_values(&[s.name])
198 .set(s.inner.len().saturated_into());
199 }
200 Poll::Ready(msg)
201 },
202 Poll::Pending => Poll::Pending,
203 }
204 }
205}
206
207impl<T> FusedStream for TracingUnboundedReceiver<T> {
208 fn is_terminated(&self) -> bool {
209 self.inner.is_terminated()
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use super::tracing_unbounded;
216 use async_channel::{self, RecvError, TryRecvError};
217
218 #[test]
219 fn test_tracing_unbounded_receiver_drop() {
220 let (tracing_unbounded_sender, tracing_unbounded_receiver) =
221 tracing_unbounded("test-receiver-drop", 10);
222 let (tx, rx) = async_channel::unbounded::<usize>();
223
224 tracing_unbounded_sender.unbounded_send(tx).unwrap();
225 drop(tracing_unbounded_receiver);
226
227 assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
228 assert_eq!(rx.recv_blocking(), Err(RecvError));
229 }
230}