1use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex;
11
12pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14 next_message_id: u64,
16 channel: &'a PSB,
18 _phantom: PhantomData<T>,
19}
20
21impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
22 pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self {
23 Self {
24 next_message_id,
25 channel,
26 _phantom: Default::default(),
27 }
28 }
29
30 pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
32 SubscriberWaitFuture { subscriber: self }
33 }
34
35 pub async fn next_message_pure(&mut self) -> T {
37 loop {
38 match self.next_message().await {
39 WaitResult::Lagged(_) => continue,
40 WaitResult::Message(message) => break message,
41 }
42 }
43 }
44
45 pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
49 match self.channel.get_message_with_context(&mut self.next_message_id, None) {
50 Poll::Ready(result) => Some(result),
51 Poll::Pending => None,
52 }
53 }
54
55 pub fn try_next_message_pure(&mut self) -> Option<T> {
59 loop {
60 match self.try_next_message() {
61 Some(WaitResult::Lagged(_)) => continue,
62 Some(WaitResult::Message(message)) => break Some(message),
63 None => break None,
64 }
65 }
66 }
67
68 pub fn available(&self) -> u64 {
71 self.channel.available(self.next_message_id)
72 }
73
74 pub fn capacity(&self) -> usize {
76 self.channel.capacity()
77 }
78
79 pub fn free_capacity(&self) -> usize {
83 self.channel.free_capacity()
84 }
85
86 pub fn clear(&self) {
88 self.channel.clear();
89 }
90
91 pub fn len(&self) -> usize {
94 self.channel.len()
95 }
96
97 pub fn is_empty(&self) -> bool {
99 self.channel.is_empty()
100 }
101
102 pub fn is_full(&self) -> bool {
104 self.channel.is_full()
105 }
106}
107
108impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
109 fn drop(&mut self) {
110 self.channel.unregister_subscriber(self.next_message_id)
111 }
112}
113
114impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
115
116impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> {
119 type Item = T;
120
121 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122 match self
123 .channel
124 .get_message_with_context(&mut self.next_message_id, Some(cx))
125 {
126 Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
127 Poll::Ready(WaitResult::Lagged(_)) => {
128 cx.waker().wake_by_ref();
129 Poll::Pending
130 }
131 Poll::Pending => Poll::Pending,
132 }
133 }
134}
135
136pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
138
139impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
140 type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
141
142 fn deref(&self) -> &Self::Target {
143 &self.0
144 }
145}
146
147impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
148 fn deref_mut(&mut self) -> &mut Self::Target {
149 &mut self.0
150 }
151}
152
153pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
155 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
156);
157
158impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
159 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
160{
161 type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
162
163 fn deref(&self) -> &Self::Target {
164 &self.0
165 }
166}
167
168impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
169 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
170{
171 fn deref_mut(&mut self) -> &mut Self::Target {
172 &mut self.0
173 }
174}
175
176#[must_use = "futures do nothing unless you `.await` or poll them"]
178pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
179 subscriber: &'s mut Sub<'a, PSB, T>,
180}
181
182impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
183 type Output = WaitResult<T>;
184
185 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
186 self.subscriber
187 .channel
188 .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
189 }
190}
191
192impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}