embassy_sync/pubsub/
mod.rs

1//! Implementation of [PubSubChannel], a queue where published messages get received by all subscribers.
2
3#![deny(missing_docs)]
4
5use core::cell::RefCell;
6use core::fmt::Debug;
7use core::task::{Context, Poll};
8
9use heapless::Deque;
10
11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex;
15use crate::waitqueue::MultiWakerRegistration;
16
17pub mod publisher;
18pub mod subscriber;
19
20pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher};
21pub use subscriber::{DynSubscriber, Subscriber};
22
23/// A broadcast channel implementation where multiple publishers can send messages to multiple subscribers
24///
25/// Any published message can be read by all subscribers.
26/// A publisher can choose how it sends its message.
27///
28/// - With [Pub::publish()] the publisher has to wait until there is space in the internal message queue.
29/// - With [Pub::publish_immediate()] the publisher doesn't await and instead lets the oldest message
30///   in the queue drop if necessary. This will cause any [Subscriber] that missed the message to receive
31///   an error to indicate that it has lagged.
32///
33/// ## Example
34///
35/// ```
36/// # use embassy_sync::blocking_mutex::raw::NoopRawMutex;
37/// # use embassy_sync::pubsub::WaitResult;
38/// # use embassy_sync::pubsub::PubSubChannel;
39/// # use futures_executor::block_on;
40/// # let test = async {
41/// // Create the channel. This can be static as well
42/// let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
43///
44/// // This is a generic subscriber with a direct reference to the channel
45/// let mut sub0 = channel.subscriber().unwrap();
46/// // This is a dynamic subscriber with a dynamic (trait object) reference to the channel
47/// let mut sub1 = channel.dyn_subscriber().unwrap();
48///
49/// let pub0 = channel.publisher().unwrap();
50///
51/// // Publish a message, but wait if the queue is full
52/// pub0.publish(42).await;
53///
54/// // Publish a message, but if the queue is full, just kick out the oldest message.
55/// // This may cause some subscribers to miss a message
56/// pub0.publish_immediate(43);
57///
58/// // Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result
59/// assert_eq!(sub0.next_message().await, WaitResult::Message(42));
60/// assert_eq!(sub1.next_message().await, WaitResult::Message(42));
61///
62/// // Wait again, but this time ignore any Lag results
63/// assert_eq!(sub0.next_message_pure().await, 43);
64/// assert_eq!(sub1.next_message_pure().await, 43);
65///
66/// // There's also a polling interface
67/// assert_eq!(sub0.try_next_message(), None);
68/// assert_eq!(sub1.try_next_message(), None);
69/// # };
70/// #
71/// # block_on(test);
72/// ```
73///
74pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
75    inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
76}
77
78impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>
79    PubSubChannel<M, T, CAP, SUBS, PUBS>
80{
81    /// Create a new channel
82    pub const fn new() -> Self {
83        Self {
84            inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())),
85        }
86    }
87
88    /// Create a new subscriber. It will only receive messages that are published after its creation.
89    ///
90    /// If there are no subscriber slots left, an error will be returned.
91    pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> {
92        self.inner.lock(|inner| {
93            let mut s = inner.borrow_mut();
94
95            if s.subscriber_count >= SUBS {
96                Err(Error::MaximumSubscribersReached)
97            } else {
98                s.subscriber_count += 1;
99                Ok(Subscriber(Sub::new(s.next_message_id, self)))
100            }
101        })
102    }
103
104    /// Create a new subscriber. It will only receive messages that are published after its creation.
105    ///
106    /// If there are no subscriber slots left, an error will be returned.
107    pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> {
108        self.inner.lock(|inner| {
109            let mut s = inner.borrow_mut();
110
111            if s.subscriber_count >= SUBS {
112                Err(Error::MaximumSubscribersReached)
113            } else {
114                s.subscriber_count += 1;
115                Ok(DynSubscriber(Sub::new(s.next_message_id, self)))
116            }
117        })
118    }
119
120    /// Create a new publisher
121    ///
122    /// If there are no publisher slots left, an error will be returned.
123    pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> {
124        self.inner.lock(|inner| {
125            let mut s = inner.borrow_mut();
126
127            if s.publisher_count >= PUBS {
128                Err(Error::MaximumPublishersReached)
129            } else {
130                s.publisher_count += 1;
131                Ok(Publisher(Pub::new(self)))
132            }
133        })
134    }
135
136    /// Create a new publisher
137    ///
138    /// If there are no publisher slots left, an error will be returned.
139    pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> {
140        self.inner.lock(|inner| {
141            let mut s = inner.borrow_mut();
142
143            if s.publisher_count >= PUBS {
144                Err(Error::MaximumPublishersReached)
145            } else {
146                s.publisher_count += 1;
147                Ok(DynPublisher(Pub::new(self)))
148            }
149        })
150    }
151
152    /// Create a new publisher that can only send immediate messages.
153    /// This kind of publisher does not take up a publisher slot.
154    pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> {
155        ImmediatePublisher(ImmediatePub::new(self))
156    }
157
158    /// Create a new publisher that can only send immediate messages.
159    /// This kind of publisher does not take up a publisher slot.
160    pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
161        DynImmediatePublisher(ImmediatePub::new(self))
162    }
163
164    /// Returns the maximum number of elements the channel can hold.
165    pub const fn capacity(&self) -> usize {
166        CAP
167    }
168
169    /// Returns the free capacity of the channel.
170    ///
171    /// This is equivalent to `capacity() - len()`
172    pub fn free_capacity(&self) -> usize {
173        CAP - self.len()
174    }
175
176    /// Clears all elements in the channel.
177    pub fn clear(&self) {
178        self.inner.lock(|inner| inner.borrow_mut().clear());
179    }
180
181    /// Returns the number of elements currently in the channel.
182    pub fn len(&self) -> usize {
183        self.inner.lock(|inner| inner.borrow().len())
184    }
185
186    /// Returns whether the channel is empty.
187    pub fn is_empty(&self) -> bool {
188        self.inner.lock(|inner| inner.borrow().is_empty())
189    }
190
191    /// Returns whether the channel is full.
192    pub fn is_full(&self) -> bool {
193        self.inner.lock(|inner| inner.borrow().is_full())
194    }
195}
196
197impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T>
198    for PubSubChannel<M, T, CAP, SUBS, PUBS>
199{
200    fn publish_immediate(&self, message: T) {
201        self.inner.lock(|s| {
202            let mut s = s.borrow_mut();
203            s.publish_immediate(message)
204        })
205    }
206
207    fn capacity(&self) -> usize {
208        self.capacity()
209    }
210
211    fn is_full(&self) -> bool {
212        self.is_full()
213    }
214}
215
216impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
217    for PubSubChannel<M, T, CAP, SUBS, PUBS>
218{
219    fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
220        self.inner.lock(|s| {
221            let mut s = s.borrow_mut();
222
223            // Check if we can read a message
224            match s.get_message(*next_message_id) {
225                // Yes, so we are done polling
226                Some(WaitResult::Message(message)) => {
227                    *next_message_id += 1;
228                    Poll::Ready(WaitResult::Message(message))
229                }
230                // No, so we need to reregister our waker and sleep again
231                None => {
232                    if let Some(cx) = cx {
233                        s.subscriber_wakers.register(cx.waker());
234                    }
235                    Poll::Pending
236                }
237                // We missed a couple of messages. We must do our internal bookkeeping and return that we lagged
238                Some(WaitResult::Lagged(amount)) => {
239                    *next_message_id += amount;
240                    Poll::Ready(WaitResult::Lagged(amount))
241                }
242            }
243        })
244    }
245
246    fn available(&self, next_message_id: u64) -> u64 {
247        self.inner.lock(|s| s.borrow().next_message_id - next_message_id)
248    }
249
250    fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
251        self.inner.lock(|s| {
252            let mut s = s.borrow_mut();
253            // Try to publish the message
254            match s.try_publish(message) {
255                // We did it, we are ready
256                Ok(()) => Ok(()),
257                // The queue is full, so we need to reregister our waker and go to sleep
258                Err(message) => {
259                    if let Some(cx) = cx {
260                        s.publisher_wakers.register(cx.waker());
261                    }
262                    Err(message)
263                }
264            }
265        })
266    }
267
268    fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
269        self.inner.lock(|s| {
270            let mut s = s.borrow_mut();
271            s.unregister_subscriber(subscriber_next_message_id)
272        })
273    }
274
275    fn unregister_publisher(&self) {
276        self.inner.lock(|s| {
277            let mut s = s.borrow_mut();
278            s.unregister_publisher()
279        })
280    }
281
282    fn free_capacity(&self) -> usize {
283        self.free_capacity()
284    }
285
286    fn clear(&self) {
287        self.clear();
288    }
289
290    fn len(&self) -> usize {
291        self.len()
292    }
293
294    fn is_empty(&self) -> bool {
295        self.is_empty()
296    }
297}
298
299/// Internal state for the PubSub channel
300struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
301    /// The queue contains the last messages that have been published and a countdown of how many subscribers are yet to read it
302    queue: Deque<(T, usize), CAP>,
303    /// Every message has an id.
304    /// Don't worry, we won't run out.
305    /// If a million messages were published every second, then the ID's would run out in about 584942 years.
306    next_message_id: u64,
307    /// Collection of wakers for Subscribers that are waiting.  
308    subscriber_wakers: MultiWakerRegistration<SUBS>,
309    /// Collection of wakers for Publishers that are waiting.  
310    publisher_wakers: MultiWakerRegistration<PUBS>,
311    /// The amount of subscribers that are active
312    subscriber_count: usize,
313    /// The amount of publishers that are active
314    publisher_count: usize,
315}
316
317impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> {
318    /// Create a new internal channel state
319    const fn new() -> Self {
320        Self {
321            queue: Deque::new(),
322            next_message_id: 0,
323            subscriber_wakers: MultiWakerRegistration::new(),
324            publisher_wakers: MultiWakerRegistration::new(),
325            subscriber_count: 0,
326            publisher_count: 0,
327        }
328    }
329
330    fn try_publish(&mut self, message: T) -> Result<(), T> {
331        if self.subscriber_count == 0 {
332            // We don't need to publish anything because there is no one to receive it
333            return Ok(());
334        }
335
336        if self.queue.is_full() {
337            return Err(message);
338        }
339        // We just did a check for this
340        self.queue.push_back((message, self.subscriber_count)).ok().unwrap();
341
342        self.next_message_id += 1;
343
344        // Wake all of the subscribers
345        self.subscriber_wakers.wake();
346
347        Ok(())
348    }
349
350    fn publish_immediate(&mut self, message: T) {
351        // Make space in the queue if required
352        if self.queue.is_full() {
353            self.queue.pop_front();
354        }
355
356        // This will succeed because we made sure there is space
357        self.try_publish(message).ok().unwrap();
358    }
359
360    fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> {
361        let start_id = self.next_message_id - self.queue.len() as u64;
362
363        if message_id < start_id {
364            return Some(WaitResult::Lagged(start_id - message_id));
365        }
366
367        let current_message_index = (message_id - start_id) as usize;
368
369        if current_message_index >= self.queue.len() {
370            return None;
371        }
372
373        // We've checked that the index is valid
374        let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap();
375
376        // We're reading this item, so decrement the counter
377        queue_item.1 -= 1;
378
379        let message = if current_message_index == 0 && queue_item.1 == 0 {
380            let (message, _) = self.queue.pop_front().unwrap();
381            self.publisher_wakers.wake();
382            // Return pop'd message without clone
383            message
384        } else {
385            queue_item.0.clone()
386        };
387
388        Some(WaitResult::Message(message))
389    }
390
391    fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
392        self.subscriber_count -= 1;
393
394        // All messages that haven't been read yet by this subscriber must have their counter decremented
395        let start_id = self.next_message_id - self.queue.len() as u64;
396        if subscriber_next_message_id >= start_id {
397            let current_message_index = (subscriber_next_message_id - start_id) as usize;
398            self.queue
399                .iter_mut()
400                .skip(current_message_index)
401                .for_each(|(_, counter)| *counter -= 1);
402
403            let mut wake_publishers = false;
404            while let Some((_, count)) = self.queue.front() {
405                if *count == 0 {
406                    self.queue.pop_front().unwrap();
407                    wake_publishers = true;
408                } else {
409                    break;
410                }
411            }
412
413            if wake_publishers {
414                self.publisher_wakers.wake();
415            }
416        }
417    }
418
419    fn unregister_publisher(&mut self) {
420        self.publisher_count -= 1;
421    }
422
423    fn clear(&mut self) {
424        self.queue.clear();
425    }
426
427    fn len(&self) -> usize {
428        self.queue.len()
429    }
430
431    fn is_empty(&self) -> bool {
432        self.queue.is_empty()
433    }
434
435    fn is_full(&self) -> bool {
436        self.queue.is_full()
437    }
438}
439
440/// Error type for the [PubSubChannel]
441#[derive(Debug, PartialEq, Eq, Clone, Copy)]
442#[cfg_attr(feature = "defmt", derive(defmt::Format))]
443pub enum Error {
444    /// All subscriber slots are used. To add another subscriber, first another subscriber must be dropped or
445    /// the capacity of the channels must be increased.
446    MaximumSubscribersReached,
447    /// All publisher slots are used. To add another publisher, first another publisher must be dropped or
448    /// the capacity of the channels must be increased.
449    MaximumPublishersReached,
450}
451
452trait SealedPubSubBehavior<T> {
453    /// Try to get a message from the queue with the given message id.
454    ///
455    /// If the message is not yet present and a context is given, then its waker is registered in the subscriber wakers.
456    fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
457
458    /// Get the amount of messages that are between the given the next_message_id and the most recent message.
459    /// This is not necessarily the amount of messages a subscriber can still received as it may have lagged.
460    fn available(&self, next_message_id: u64) -> u64;
461
462    /// Try to publish a message to the queue.
463    ///
464    /// If the queue is full and a context is given, then its waker is registered in the publisher wakers.
465    fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
466
467    /// Returns the free capacity of the channel.
468    ///
469    /// This is equivalent to `capacity() - len()`
470    fn free_capacity(&self) -> usize;
471
472    /// Clears all elements in the channel.
473    fn clear(&self);
474
475    /// Returns the number of elements currently in the channel.
476    fn len(&self) -> usize;
477
478    /// Returns whether the channel is empty.
479    fn is_empty(&self) -> bool;
480
481    /// Let the channel know that a subscriber has dropped
482    fn unregister_subscriber(&self, subscriber_next_message_id: u64);
483
484    /// Let the channel know that a publisher has dropped
485    fn unregister_publisher(&self);
486}
487
488/// 'Middle level' behaviour of the pubsub channel.
489/// This trait is used so that Sub and Pub can be generic over the channel.
490#[allow(private_bounds)]
491pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {
492    /// Publish a message immediately
493    fn publish_immediate(&self, message: T);
494
495    /// Returns the maximum number of elements the channel can hold.
496    fn capacity(&self) -> usize;
497
498    /// Returns whether the channel is full.
499    fn is_full(&self) -> bool;
500}
501
502/// The result of the subscriber wait procedure
503#[derive(Debug, Clone, PartialEq, Eq)]
504#[cfg_attr(feature = "defmt", derive(defmt::Format))]
505pub enum WaitResult<T> {
506    /// The subscriber did not receive all messages and lagged by the given amount of messages.
507    /// (This is the amount of messages that were missed)
508    Lagged(u64),
509    /// A message was received
510    Message(T),
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use crate::blocking_mutex::raw::NoopRawMutex;
517
518    #[futures_test::test]
519    async fn dyn_pub_sub_works() {
520        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
521
522        let mut sub0 = channel.dyn_subscriber().unwrap();
523        let mut sub1 = channel.dyn_subscriber().unwrap();
524        let pub0 = channel.dyn_publisher().unwrap();
525
526        pub0.publish(42).await;
527
528        assert_eq!(sub0.next_message().await, WaitResult::Message(42));
529        assert_eq!(sub1.next_message().await, WaitResult::Message(42));
530
531        assert_eq!(sub0.try_next_message(), None);
532        assert_eq!(sub1.try_next_message(), None);
533    }
534
535    #[futures_test::test]
536    async fn all_subscribers_receive() {
537        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
538
539        let mut sub0 = channel.subscriber().unwrap();
540        let mut sub1 = channel.subscriber().unwrap();
541        let pub0 = channel.publisher().unwrap();
542
543        pub0.publish(42).await;
544
545        assert_eq!(sub0.next_message().await, WaitResult::Message(42));
546        assert_eq!(sub1.next_message().await, WaitResult::Message(42));
547
548        assert_eq!(sub0.try_next_message(), None);
549        assert_eq!(sub1.try_next_message(), None);
550    }
551
552    #[futures_test::test]
553    async fn lag_when_queue_full_on_immediate_publish() {
554        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
555
556        let mut sub0 = channel.subscriber().unwrap();
557        let pub0 = channel.publisher().unwrap();
558
559        pub0.publish_immediate(42);
560        pub0.publish_immediate(43);
561        pub0.publish_immediate(44);
562        pub0.publish_immediate(45);
563        pub0.publish_immediate(46);
564        pub0.publish_immediate(47);
565
566        assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2)));
567        assert_eq!(sub0.next_message().await, WaitResult::Message(44));
568        assert_eq!(sub0.next_message().await, WaitResult::Message(45));
569        assert_eq!(sub0.next_message().await, WaitResult::Message(46));
570        assert_eq!(sub0.next_message().await, WaitResult::Message(47));
571        assert_eq!(sub0.try_next_message(), None);
572    }
573
574    #[test]
575    fn limited_subs_and_pubs() {
576        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
577
578        let sub0 = channel.subscriber();
579        let sub1 = channel.subscriber();
580        let sub2 = channel.subscriber();
581        let sub3 = channel.subscriber();
582        let sub4 = channel.subscriber();
583
584        assert!(sub0.is_ok());
585        assert!(sub1.is_ok());
586        assert!(sub2.is_ok());
587        assert!(sub3.is_ok());
588        assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached);
589
590        drop(sub0);
591
592        let sub5 = channel.subscriber();
593        assert!(sub5.is_ok());
594
595        // publishers
596
597        let pub0 = channel.publisher();
598        let pub1 = channel.publisher();
599        let pub2 = channel.publisher();
600        let pub3 = channel.publisher();
601        let pub4 = channel.publisher();
602
603        assert!(pub0.is_ok());
604        assert!(pub1.is_ok());
605        assert!(pub2.is_ok());
606        assert!(pub3.is_ok());
607        assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached);
608
609        drop(pub0);
610
611        let pub5 = channel.publisher();
612        assert!(pub5.is_ok());
613    }
614
615    #[test]
616    fn publisher_wait_on_full_queue() {
617        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
618
619        let pub0 = channel.publisher().unwrap();
620
621        // There are no subscribers, so the queue will never be full
622        assert_eq!(pub0.try_publish(0), Ok(()));
623        assert_eq!(pub0.try_publish(0), Ok(()));
624        assert_eq!(pub0.try_publish(0), Ok(()));
625        assert_eq!(pub0.try_publish(0), Ok(()));
626        assert_eq!(pub0.try_publish(0), Ok(()));
627
628        let sub0 = channel.subscriber().unwrap();
629
630        assert_eq!(pub0.try_publish(0), Ok(()));
631        assert_eq!(pub0.try_publish(0), Ok(()));
632        assert_eq!(pub0.try_publish(0), Ok(()));
633        assert_eq!(pub0.try_publish(0), Ok(()));
634        assert!(pub0.is_full());
635        assert_eq!(pub0.try_publish(0), Err(0));
636
637        drop(sub0);
638    }
639
640    #[futures_test::test]
641    async fn correct_available() {
642        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
643
644        let sub0 = channel.subscriber().unwrap();
645        let mut sub1 = channel.subscriber().unwrap();
646        let pub0 = channel.publisher().unwrap();
647
648        assert_eq!(sub0.available(), 0);
649        assert_eq!(sub1.available(), 0);
650
651        pub0.publish(42).await;
652
653        assert_eq!(sub0.available(), 1);
654        assert_eq!(sub1.available(), 1);
655
656        sub1.next_message().await;
657
658        assert_eq!(sub1.available(), 0);
659
660        pub0.publish(42).await;
661
662        assert_eq!(sub0.available(), 2);
663        assert_eq!(sub1.available(), 1);
664    }
665
666    #[futures_test::test]
667    async fn correct_len() {
668        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
669
670        let mut sub0 = channel.subscriber().unwrap();
671        let mut sub1 = channel.subscriber().unwrap();
672        let pub0 = channel.publisher().unwrap();
673
674        assert!(sub0.is_empty());
675        assert!(sub1.is_empty());
676        assert!(pub0.is_empty());
677        assert_eq!(pub0.free_capacity(), 4);
678        assert_eq!(pub0.len(), 0);
679
680        pub0.publish(42).await;
681
682        assert_eq!(pub0.free_capacity(), 3);
683        assert_eq!(pub0.len(), 1);
684
685        pub0.publish(42).await;
686
687        assert_eq!(pub0.free_capacity(), 2);
688        assert_eq!(pub0.len(), 2);
689
690        sub0.next_message().await;
691        sub0.next_message().await;
692
693        assert_eq!(pub0.free_capacity(), 2);
694        assert_eq!(pub0.len(), 2);
695
696        sub1.next_message().await;
697        assert_eq!(pub0.free_capacity(), 3);
698        assert_eq!(pub0.len(), 1);
699
700        sub1.next_message().await;
701        assert_eq!(pub0.free_capacity(), 4);
702        assert_eq!(pub0.len(), 0);
703    }
704
705    #[futures_test::test]
706    async fn empty_channel_when_last_subscriber_is_dropped() {
707        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
708
709        let pub0 = channel.publisher().unwrap();
710        let mut sub0 = channel.subscriber().unwrap();
711        let mut sub1 = channel.subscriber().unwrap();
712
713        assert_eq!(4, pub0.free_capacity());
714
715        pub0.publish(1).await;
716        pub0.publish(2).await;
717
718        assert_eq!(2, channel.free_capacity());
719
720        assert_eq!(1, sub0.try_next_message_pure().unwrap());
721        assert_eq!(2, sub0.try_next_message_pure().unwrap());
722
723        assert_eq!(2, channel.free_capacity());
724
725        drop(sub0);
726
727        assert_eq!(2, channel.free_capacity());
728
729        assert_eq!(1, sub1.try_next_message_pure().unwrap());
730
731        assert_eq!(3, channel.free_capacity());
732
733        drop(sub1);
734
735        assert_eq!(4, channel.free_capacity());
736    }
737
738    struct CloneCallCounter(usize);
739
740    impl Clone for CloneCallCounter {
741        fn clone(&self) -> Self {
742            Self(self.0 + 1)
743        }
744    }
745
746    #[futures_test::test]
747    async fn skip_clone_for_last_message() {
748        let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new();
749        let pub0 = channel.publisher().unwrap();
750        let mut sub0 = channel.subscriber().unwrap();
751        let mut sub1 = channel.subscriber().unwrap();
752
753        pub0.publish(CloneCallCounter(0)).await;
754
755        assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
756        assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
757    }
758
759    #[futures_test::test]
760    async fn publisher_sink() {
761        use futures_util::{SinkExt, StreamExt};
762
763        let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
764
765        let mut sub = channel.subscriber().unwrap();
766
767        let publ = channel.publisher().unwrap();
768        let mut sink = publ.sink();
769
770        sink.send(0).await.unwrap();
771        assert_eq!(0, sub.try_next_message_pure().unwrap());
772
773        sink.send(1).await.unwrap();
774        assert_eq!(1, sub.try_next_message_pure().unwrap());
775
776        sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok))
777            .await
778            .unwrap();
779        assert_eq!(0, sub.try_next_message_pure().unwrap());
780        assert_eq!(1, sub.try_next_message_pure().unwrap());
781        assert_eq!(2, sub.try_next_message_pure().unwrap());
782        assert_eq!(3, sub.try_next_message_pure().unwrap());
783    }
784}