embassy_sync/pubsub/
subscriber.rs

1//! Implementation of anything directly subscriber related
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex;
11
12/// A subscriber to a channel
13pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
14    /// The message id of the next message we are yet to receive
15    next_message_id: u64,
16    /// The channel we are a subscriber to
17    channel: &'a PSB,
18    _phantom: PhantomData<T>,
19}
20
21impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
22    pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self {
23        Self {
24            next_message_id,
25            channel,
26            _phantom: Default::default(),
27        }
28    }
29
30    /// Wait for a published message
31    pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
32        SubscriberWaitFuture { subscriber: self }
33    }
34
35    /// Wait for a published message (ignoring lag results)
36    pub async fn next_message_pure(&mut self) -> T {
37        loop {
38            match self.next_message().await {
39                WaitResult::Lagged(_) => continue,
40                WaitResult::Message(message) => break message,
41            }
42        }
43    }
44
45    /// Try to see if there's a published message we haven't received yet.
46    ///
47    /// This function does not peek. The message is received if there is one.
48    pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
49        match self.channel.get_message_with_context(&mut self.next_message_id, None) {
50            Poll::Ready(result) => Some(result),
51            Poll::Pending => None,
52        }
53    }
54
55    /// Try to see if there's a published message we haven't received yet (ignoring lag results).
56    ///
57    /// This function does not peek. The message is received if there is one.
58    pub fn try_next_message_pure(&mut self) -> Option<T> {
59        loop {
60            match self.try_next_message() {
61                Some(WaitResult::Lagged(_)) => continue,
62                Some(WaitResult::Message(message)) => break Some(message),
63                None => break None,
64            }
65        }
66    }
67
68    /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically
69    /// for this subscriber.
70    pub fn available(&self) -> u64 {
71        self.channel.available(self.next_message_id)
72    }
73
74    /// Returns the maximum number of elements the ***channel*** can hold.
75    pub fn capacity(&self) -> usize {
76        self.channel.capacity()
77    }
78
79    /// Returns the free capacity of the ***channel***.
80    ///
81    /// This is equivalent to `capacity() - len()`
82    pub fn free_capacity(&self) -> usize {
83        self.channel.free_capacity()
84    }
85
86    /// Clears all elements in the ***channel***.
87    pub fn clear(&self) {
88        self.channel.clear();
89    }
90
91    /// Returns the number of elements currently in the ***channel***.
92    /// See [Self::available] for how many messages are available for this subscriber.
93    pub fn len(&self) -> usize {
94        self.channel.len()
95    }
96
97    /// Returns whether the ***channel*** is empty.
98    pub fn is_empty(&self) -> bool {
99        self.channel.is_empty()
100    }
101
102    /// Returns whether the ***channel*** is full.
103    pub fn is_full(&self) -> bool {
104        self.channel.is_full()
105    }
106}
107
108impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
109    fn drop(&mut self) {
110        self.channel.unregister_subscriber(self.next_message_id)
111    }
112}
113
114impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
115
116/// Warning: The stream implementation ignores lag results and returns all messages.
117/// This might miss some messages without you knowing it.
118impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_util::Stream for Sub<'a, PSB, T> {
119    type Item = T;
120
121    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122        match self
123            .channel
124            .get_message_with_context(&mut self.next_message_id, Some(cx))
125        {
126            Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
127            Poll::Ready(WaitResult::Lagged(_)) => {
128                cx.waker().wake_by_ref();
129                Poll::Pending
130            }
131            Poll::Pending => Poll::Pending,
132        }
133    }
134}
135
136/// A subscriber that holds a dynamic reference to the channel
137pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
138
139impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
140    type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
141
142    fn deref(&self) -> &Self::Target {
143        &self.0
144    }
145}
146
147impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
148    fn deref_mut(&mut self) -> &mut Self::Target {
149        &mut self.0
150    }
151}
152
153/// A subscriber that holds a generic reference to the channel
154pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
155    pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
156);
157
158impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
159    for Subscriber<'a, M, T, CAP, SUBS, PUBS>
160{
161    type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
162
163    fn deref(&self) -> &Self::Target {
164        &self.0
165    }
166}
167
168impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
169    for Subscriber<'a, M, T, CAP, SUBS, PUBS>
170{
171    fn deref_mut(&mut self) -> &mut Self::Target {
172        &mut self.0
173    }
174}
175
176/// Future for the subscriber wait action
177#[must_use = "futures do nothing unless you `.await` or poll them"]
178pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
179    subscriber: &'s mut Sub<'a, PSB, T>,
180}
181
182impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
183    type Output = WaitResult<T>;
184
185    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
186        self.subscriber
187            .channel
188            .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
189    }
190}
191
192impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}