1#![deny(missing_docs)]
4
5use core::cell::RefCell;
6use core::fmt::Debug;
7use core::task::{Context, Poll};
8
9use heapless::Deque;
10
11use self::publisher::{ImmediatePub, Pub};
12use self::subscriber::Sub;
13use crate::blocking_mutex::raw::RawMutex;
14use crate::blocking_mutex::Mutex;
15use crate::waitqueue::MultiWakerRegistration;
16
17pub mod publisher;
18pub mod subscriber;
19
20pub use publisher::{DynImmediatePublisher, DynPublisher, ImmediatePublisher, Publisher};
21pub use subscriber::{DynSubscriber, Subscriber};
22
23pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
75 inner: Mutex<M, RefCell<PubSubState<T, CAP, SUBS, PUBS>>>,
76}
77
78impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>
79 PubSubChannel<M, T, CAP, SUBS, PUBS>
80{
81 pub const fn new() -> Self {
83 Self {
84 inner: Mutex::const_new(M::INIT, RefCell::new(PubSubState::new())),
85 }
86 }
87
88 pub fn subscriber(&self) -> Result<Subscriber<M, T, CAP, SUBS, PUBS>, Error> {
92 self.inner.lock(|inner| {
93 let mut s = inner.borrow_mut();
94
95 if s.subscriber_count >= SUBS {
96 Err(Error::MaximumSubscribersReached)
97 } else {
98 s.subscriber_count += 1;
99 Ok(Subscriber(Sub::new(s.next_message_id, self)))
100 }
101 })
102 }
103
104 pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error> {
108 self.inner.lock(|inner| {
109 let mut s = inner.borrow_mut();
110
111 if s.subscriber_count >= SUBS {
112 Err(Error::MaximumSubscribersReached)
113 } else {
114 s.subscriber_count += 1;
115 Ok(DynSubscriber(Sub::new(s.next_message_id, self)))
116 }
117 })
118 }
119
120 pub fn publisher(&self) -> Result<Publisher<M, T, CAP, SUBS, PUBS>, Error> {
124 self.inner.lock(|inner| {
125 let mut s = inner.borrow_mut();
126
127 if s.publisher_count >= PUBS {
128 Err(Error::MaximumPublishersReached)
129 } else {
130 s.publisher_count += 1;
131 Ok(Publisher(Pub::new(self)))
132 }
133 })
134 }
135
136 pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error> {
140 self.inner.lock(|inner| {
141 let mut s = inner.borrow_mut();
142
143 if s.publisher_count >= PUBS {
144 Err(Error::MaximumPublishersReached)
145 } else {
146 s.publisher_count += 1;
147 Ok(DynPublisher(Pub::new(self)))
148 }
149 })
150 }
151
152 pub fn immediate_publisher(&self) -> ImmediatePublisher<M, T, CAP, SUBS, PUBS> {
155 ImmediatePublisher(ImmediatePub::new(self))
156 }
157
158 pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<T> {
161 DynImmediatePublisher(ImmediatePub::new(self))
162 }
163
164 pub const fn capacity(&self) -> usize {
166 CAP
167 }
168
169 pub fn free_capacity(&self) -> usize {
173 CAP - self.len()
174 }
175
176 pub fn clear(&self) {
178 self.inner.lock(|inner| inner.borrow_mut().clear());
179 }
180
181 pub fn len(&self) -> usize {
183 self.inner.lock(|inner| inner.borrow().len())
184 }
185
186 pub fn is_empty(&self) -> bool {
188 self.inner.lock(|inner| inner.borrow().is_empty())
189 }
190
191 pub fn is_full(&self) -> bool {
193 self.inner.lock(|inner| inner.borrow().is_full())
194 }
195}
196
197impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> crate::pubsub::PubSubBehavior<T>
198 for PubSubChannel<M, T, CAP, SUBS, PUBS>
199{
200 fn publish_immediate(&self, message: T) {
201 self.inner.lock(|s| {
202 let mut s = s.borrow_mut();
203 s.publish_immediate(message)
204 })
205 }
206
207 fn capacity(&self) -> usize {
208 self.capacity()
209 }
210
211 fn is_full(&self) -> bool {
212 self.is_full()
213 }
214}
215
216impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> SealedPubSubBehavior<T>
217 for PubSubChannel<M, T, CAP, SUBS, PUBS>
218{
219 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>> {
220 self.inner.lock(|s| {
221 let mut s = s.borrow_mut();
222
223 match s.get_message(*next_message_id) {
225 Some(WaitResult::Message(message)) => {
227 *next_message_id += 1;
228 Poll::Ready(WaitResult::Message(message))
229 }
230 None => {
232 if let Some(cx) = cx {
233 s.subscriber_wakers.register(cx.waker());
234 }
235 Poll::Pending
236 }
237 Some(WaitResult::Lagged(amount)) => {
239 *next_message_id += amount;
240 Poll::Ready(WaitResult::Lagged(amount))
241 }
242 }
243 })
244 }
245
246 fn available(&self, next_message_id: u64) -> u64 {
247 self.inner.lock(|s| s.borrow().next_message_id - next_message_id)
248 }
249
250 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T> {
251 self.inner.lock(|s| {
252 let mut s = s.borrow_mut();
253 match s.try_publish(message) {
255 Ok(()) => Ok(()),
257 Err(message) => {
259 if let Some(cx) = cx {
260 s.publisher_wakers.register(cx.waker());
261 }
262 Err(message)
263 }
264 }
265 })
266 }
267
268 fn unregister_subscriber(&self, subscriber_next_message_id: u64) {
269 self.inner.lock(|s| {
270 let mut s = s.borrow_mut();
271 s.unregister_subscriber(subscriber_next_message_id)
272 })
273 }
274
275 fn unregister_publisher(&self) {
276 self.inner.lock(|s| {
277 let mut s = s.borrow_mut();
278 s.unregister_publisher()
279 })
280 }
281
282 fn free_capacity(&self) -> usize {
283 self.free_capacity()
284 }
285
286 fn clear(&self) {
287 self.clear();
288 }
289
290 fn len(&self) -> usize {
291 self.len()
292 }
293
294 fn is_empty(&self) -> bool {
295 self.is_empty()
296 }
297}
298
299struct PubSubState<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> {
301 queue: Deque<(T, usize), CAP>,
303 next_message_id: u64,
307 subscriber_wakers: MultiWakerRegistration<SUBS>,
309 publisher_wakers: MultiWakerRegistration<PUBS>,
311 subscriber_count: usize,
313 publisher_count: usize,
315}
316
317impl<T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubState<T, CAP, SUBS, PUBS> {
318 const fn new() -> Self {
320 Self {
321 queue: Deque::new(),
322 next_message_id: 0,
323 subscriber_wakers: MultiWakerRegistration::new(),
324 publisher_wakers: MultiWakerRegistration::new(),
325 subscriber_count: 0,
326 publisher_count: 0,
327 }
328 }
329
330 fn try_publish(&mut self, message: T) -> Result<(), T> {
331 if self.subscriber_count == 0 {
332 return Ok(());
334 }
335
336 if self.queue.is_full() {
337 return Err(message);
338 }
339 self.queue.push_back((message, self.subscriber_count)).ok().unwrap();
341
342 self.next_message_id += 1;
343
344 self.subscriber_wakers.wake();
346
347 Ok(())
348 }
349
350 fn publish_immediate(&mut self, message: T) {
351 if self.queue.is_full() {
353 self.queue.pop_front();
354 }
355
356 self.try_publish(message).ok().unwrap();
358 }
359
360 fn get_message(&mut self, message_id: u64) -> Option<WaitResult<T>> {
361 let start_id = self.next_message_id - self.queue.len() as u64;
362
363 if message_id < start_id {
364 return Some(WaitResult::Lagged(start_id - message_id));
365 }
366
367 let current_message_index = (message_id - start_id) as usize;
368
369 if current_message_index >= self.queue.len() {
370 return None;
371 }
372
373 let queue_item = self.queue.iter_mut().nth(current_message_index).unwrap();
375
376 queue_item.1 -= 1;
378
379 let message = if current_message_index == 0 && queue_item.1 == 0 {
380 let (message, _) = self.queue.pop_front().unwrap();
381 self.publisher_wakers.wake();
382 message
384 } else {
385 queue_item.0.clone()
386 };
387
388 Some(WaitResult::Message(message))
389 }
390
391 fn unregister_subscriber(&mut self, subscriber_next_message_id: u64) {
392 self.subscriber_count -= 1;
393
394 let start_id = self.next_message_id - self.queue.len() as u64;
396 if subscriber_next_message_id >= start_id {
397 let current_message_index = (subscriber_next_message_id - start_id) as usize;
398 self.queue
399 .iter_mut()
400 .skip(current_message_index)
401 .for_each(|(_, counter)| *counter -= 1);
402
403 let mut wake_publishers = false;
404 while let Some((_, count)) = self.queue.front() {
405 if *count == 0 {
406 self.queue.pop_front().unwrap();
407 wake_publishers = true;
408 } else {
409 break;
410 }
411 }
412
413 if wake_publishers {
414 self.publisher_wakers.wake();
415 }
416 }
417 }
418
419 fn unregister_publisher(&mut self) {
420 self.publisher_count -= 1;
421 }
422
423 fn clear(&mut self) {
424 self.queue.clear();
425 }
426
427 fn len(&self) -> usize {
428 self.queue.len()
429 }
430
431 fn is_empty(&self) -> bool {
432 self.queue.is_empty()
433 }
434
435 fn is_full(&self) -> bool {
436 self.queue.is_full()
437 }
438}
439
440#[derive(Debug, PartialEq, Eq, Clone, Copy)]
442#[cfg_attr(feature = "defmt", derive(defmt::Format))]
443pub enum Error {
444 MaximumSubscribersReached,
447 MaximumPublishersReached,
450}
451
452trait SealedPubSubBehavior<T> {
453 fn get_message_with_context(&self, next_message_id: &mut u64, cx: Option<&mut Context<'_>>) -> Poll<WaitResult<T>>;
457
458 fn available(&self, next_message_id: u64) -> u64;
461
462 fn publish_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), T>;
466
467 fn free_capacity(&self) -> usize;
471
472 fn clear(&self);
474
475 fn len(&self) -> usize;
477
478 fn is_empty(&self) -> bool;
480
481 fn unregister_subscriber(&self, subscriber_next_message_id: u64);
483
484 fn unregister_publisher(&self);
486}
487
488#[allow(private_bounds)]
491pub trait PubSubBehavior<T>: SealedPubSubBehavior<T> {
492 fn publish_immediate(&self, message: T);
494
495 fn capacity(&self) -> usize;
497
498 fn is_full(&self) -> bool;
500}
501
502#[derive(Debug, Clone, PartialEq, Eq)]
504#[cfg_attr(feature = "defmt", derive(defmt::Format))]
505pub enum WaitResult<T> {
506 Lagged(u64),
509 Message(T),
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516 use crate::blocking_mutex::raw::NoopRawMutex;
517
518 #[futures_test::test]
519 async fn dyn_pub_sub_works() {
520 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
521
522 let mut sub0 = channel.dyn_subscriber().unwrap();
523 let mut sub1 = channel.dyn_subscriber().unwrap();
524 let pub0 = channel.dyn_publisher().unwrap();
525
526 pub0.publish(42).await;
527
528 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
529 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
530
531 assert_eq!(sub0.try_next_message(), None);
532 assert_eq!(sub1.try_next_message(), None);
533 }
534
535 #[futures_test::test]
536 async fn all_subscribers_receive() {
537 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
538
539 let mut sub0 = channel.subscriber().unwrap();
540 let mut sub1 = channel.subscriber().unwrap();
541 let pub0 = channel.publisher().unwrap();
542
543 pub0.publish(42).await;
544
545 assert_eq!(sub0.next_message().await, WaitResult::Message(42));
546 assert_eq!(sub1.next_message().await, WaitResult::Message(42));
547
548 assert_eq!(sub0.try_next_message(), None);
549 assert_eq!(sub1.try_next_message(), None);
550 }
551
552 #[futures_test::test]
553 async fn lag_when_queue_full_on_immediate_publish() {
554 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
555
556 let mut sub0 = channel.subscriber().unwrap();
557 let pub0 = channel.publisher().unwrap();
558
559 pub0.publish_immediate(42);
560 pub0.publish_immediate(43);
561 pub0.publish_immediate(44);
562 pub0.publish_immediate(45);
563 pub0.publish_immediate(46);
564 pub0.publish_immediate(47);
565
566 assert_eq!(sub0.try_next_message(), Some(WaitResult::Lagged(2)));
567 assert_eq!(sub0.next_message().await, WaitResult::Message(44));
568 assert_eq!(sub0.next_message().await, WaitResult::Message(45));
569 assert_eq!(sub0.next_message().await, WaitResult::Message(46));
570 assert_eq!(sub0.next_message().await, WaitResult::Message(47));
571 assert_eq!(sub0.try_next_message(), None);
572 }
573
574 #[test]
575 fn limited_subs_and_pubs() {
576 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
577
578 let sub0 = channel.subscriber();
579 let sub1 = channel.subscriber();
580 let sub2 = channel.subscriber();
581 let sub3 = channel.subscriber();
582 let sub4 = channel.subscriber();
583
584 assert!(sub0.is_ok());
585 assert!(sub1.is_ok());
586 assert!(sub2.is_ok());
587 assert!(sub3.is_ok());
588 assert_eq!(sub4.err().unwrap(), Error::MaximumSubscribersReached);
589
590 drop(sub0);
591
592 let sub5 = channel.subscriber();
593 assert!(sub5.is_ok());
594
595 let pub0 = channel.publisher();
598 let pub1 = channel.publisher();
599 let pub2 = channel.publisher();
600 let pub3 = channel.publisher();
601 let pub4 = channel.publisher();
602
603 assert!(pub0.is_ok());
604 assert!(pub1.is_ok());
605 assert!(pub2.is_ok());
606 assert!(pub3.is_ok());
607 assert_eq!(pub4.err().unwrap(), Error::MaximumPublishersReached);
608
609 drop(pub0);
610
611 let pub5 = channel.publisher();
612 assert!(pub5.is_ok());
613 }
614
615 #[test]
616 fn publisher_wait_on_full_queue() {
617 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
618
619 let pub0 = channel.publisher().unwrap();
620
621 assert_eq!(pub0.try_publish(0), Ok(()));
623 assert_eq!(pub0.try_publish(0), Ok(()));
624 assert_eq!(pub0.try_publish(0), Ok(()));
625 assert_eq!(pub0.try_publish(0), Ok(()));
626 assert_eq!(pub0.try_publish(0), Ok(()));
627
628 let sub0 = channel.subscriber().unwrap();
629
630 assert_eq!(pub0.try_publish(0), Ok(()));
631 assert_eq!(pub0.try_publish(0), Ok(()));
632 assert_eq!(pub0.try_publish(0), Ok(()));
633 assert_eq!(pub0.try_publish(0), Ok(()));
634 assert!(pub0.is_full());
635 assert_eq!(pub0.try_publish(0), Err(0));
636
637 drop(sub0);
638 }
639
640 #[futures_test::test]
641 async fn correct_available() {
642 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
643
644 let sub0 = channel.subscriber().unwrap();
645 let mut sub1 = channel.subscriber().unwrap();
646 let pub0 = channel.publisher().unwrap();
647
648 assert_eq!(sub0.available(), 0);
649 assert_eq!(sub1.available(), 0);
650
651 pub0.publish(42).await;
652
653 assert_eq!(sub0.available(), 1);
654 assert_eq!(sub1.available(), 1);
655
656 sub1.next_message().await;
657
658 assert_eq!(sub1.available(), 0);
659
660 pub0.publish(42).await;
661
662 assert_eq!(sub0.available(), 2);
663 assert_eq!(sub1.available(), 1);
664 }
665
666 #[futures_test::test]
667 async fn correct_len() {
668 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
669
670 let mut sub0 = channel.subscriber().unwrap();
671 let mut sub1 = channel.subscriber().unwrap();
672 let pub0 = channel.publisher().unwrap();
673
674 assert!(sub0.is_empty());
675 assert!(sub1.is_empty());
676 assert!(pub0.is_empty());
677 assert_eq!(pub0.free_capacity(), 4);
678 assert_eq!(pub0.len(), 0);
679
680 pub0.publish(42).await;
681
682 assert_eq!(pub0.free_capacity(), 3);
683 assert_eq!(pub0.len(), 1);
684
685 pub0.publish(42).await;
686
687 assert_eq!(pub0.free_capacity(), 2);
688 assert_eq!(pub0.len(), 2);
689
690 sub0.next_message().await;
691 sub0.next_message().await;
692
693 assert_eq!(pub0.free_capacity(), 2);
694 assert_eq!(pub0.len(), 2);
695
696 sub1.next_message().await;
697 assert_eq!(pub0.free_capacity(), 3);
698 assert_eq!(pub0.len(), 1);
699
700 sub1.next_message().await;
701 assert_eq!(pub0.free_capacity(), 4);
702 assert_eq!(pub0.len(), 0);
703 }
704
705 #[futures_test::test]
706 async fn empty_channel_when_last_subscriber_is_dropped() {
707 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
708
709 let pub0 = channel.publisher().unwrap();
710 let mut sub0 = channel.subscriber().unwrap();
711 let mut sub1 = channel.subscriber().unwrap();
712
713 assert_eq!(4, pub0.free_capacity());
714
715 pub0.publish(1).await;
716 pub0.publish(2).await;
717
718 assert_eq!(2, channel.free_capacity());
719
720 assert_eq!(1, sub0.try_next_message_pure().unwrap());
721 assert_eq!(2, sub0.try_next_message_pure().unwrap());
722
723 assert_eq!(2, channel.free_capacity());
724
725 drop(sub0);
726
727 assert_eq!(2, channel.free_capacity());
728
729 assert_eq!(1, sub1.try_next_message_pure().unwrap());
730
731 assert_eq!(3, channel.free_capacity());
732
733 drop(sub1);
734
735 assert_eq!(4, channel.free_capacity());
736 }
737
738 struct CloneCallCounter(usize);
739
740 impl Clone for CloneCallCounter {
741 fn clone(&self) -> Self {
742 Self(self.0 + 1)
743 }
744 }
745
746 #[futures_test::test]
747 async fn skip_clone_for_last_message() {
748 let channel = PubSubChannel::<NoopRawMutex, CloneCallCounter, 1, 2, 1>::new();
749 let pub0 = channel.publisher().unwrap();
750 let mut sub0 = channel.subscriber().unwrap();
751 let mut sub1 = channel.subscriber().unwrap();
752
753 pub0.publish(CloneCallCounter(0)).await;
754
755 assert_eq!(1, sub0.try_next_message_pure().unwrap().0);
756 assert_eq!(0, sub1.try_next_message_pure().unwrap().0);
757 }
758
759 #[futures_test::test]
760 async fn publisher_sink() {
761 use futures_util::{SinkExt, StreamExt};
762
763 let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();
764
765 let mut sub = channel.subscriber().unwrap();
766
767 let publ = channel.publisher().unwrap();
768 let mut sink = publ.sink();
769
770 sink.send(0).await.unwrap();
771 assert_eq!(0, sub.try_next_message_pure().unwrap());
772
773 sink.send(1).await.unwrap();
774 assert_eq!(1, sub.try_next_message_pure().unwrap());
775
776 sink.send_all(&mut futures_util::stream::iter(0..4).map(Ok))
777 .await
778 .unwrap();
779 assert_eq!(0, sub.try_next_message_pure().unwrap());
780 assert_eq!(1, sub.try_next_message_pure().unwrap());
781 assert_eq!(2, sub.try_next_message_pure().unwrap());
782 assert_eq!(3, sub.try_next_message_pure().unwrap());
783 }
784}