kube_runtime/reflector/
dispatcher.rs

1use core::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5use std::{fmt::Debug, sync::Arc};
6
7use educe::Educe;
8use futures::Stream;
9use pin_project::pin_project;
10use std::task::ready;
11
12use crate::reflector::{ObjectRef, Store};
13use async_broadcast::{InactiveReceiver, Receiver, Sender};
14
15use super::Lookup;
16
17#[derive(Educe)]
18#[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)]
19// A helper type that holds a broadcast transmitter and a broadcast receiver,
20// used to fan-out events from a root stream to multiple listeners.
21pub(crate) struct Dispatcher<K>
22where
23    K: Lookup + Clone + 'static,
24    K::DynamicType: Eq + std::hash::Hash + Clone,
25{
26    dispatch_tx: Sender<ObjectRef<K>>,
27    // An inactive reader that prevents the channel from closing until the
28    // writer is dropped.
29    _dispatch_rx: InactiveReceiver<ObjectRef<K>>,
30}
31
32impl<K> Dispatcher<K>
33where
34    K: Lookup + Clone + 'static,
35    K::DynamicType: Eq + std::hash::Hash + Clone,
36{
37    /// Creates and returns a new self that wraps a broadcast sender and an
38    /// inactive broadcast receiver
39    ///
40    /// A buffer size is required to create the underlying broadcast channel.
41    /// Messages will be buffered until all active readers have received a copy
42    /// of the message. When the channel is full, senders will apply
43    /// backpressure by waiting for space to free up.
44    //
45    // N.B messages are eagerly broadcasted, meaning no active receivers are
46    // required for a message to be broadcasted.
47    pub(crate) fn new(buf_size: usize) -> Dispatcher<K> {
48        // Create a broadcast (tx, rx) pair
49        let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size);
50        // The tx half will not wait for any receivers to be active before
51        // broadcasting events. If no receivers are active, events will be
52        // buffered.
53        dispatch_tx.set_await_active(false);
54        Self {
55            dispatch_tx,
56            _dispatch_rx: dispatch_rx.deactivate(),
57        }
58    }
59
60    // Calls broadcast on the channel. Will return when the channel has enough
61    // space to send an event.
62    pub(crate) async fn broadcast(&mut self, obj_ref: ObjectRef<K>) {
63        let _ = self.dispatch_tx.broadcast_direct(obj_ref).await;
64    }
65
66    // Creates a `ReflectHandle` by creating a receiver from the tx half.
67    // N.B: the new receiver will be fast-forwarded to the _latest_ event.
68    // The receiver won't have access to any events that are currently waiting
69    // to be acked by listeners.
70    pub(crate) fn subscribe(&self, reader: Store<K>) -> ReflectHandle<K> {
71        ReflectHandle::new(reader, self.dispatch_tx.new_receiver())
72    }
73}
74
75/// A handle to a shared stream reader
76///
77/// [`ReflectHandle`]s are created by calling [`subscribe()`] on a [`Writer`],
78/// or by calling `clone()` on an already existing [`ReflectHandle`]. Each
79/// shared stream reader should be polled independently and driven to readiness
80/// to avoid deadlocks. When the [`Writer`]'s buffer is filled, backpressure
81/// will be applied on the root stream side.
82///
83/// When the root stream is dropped, or it ends, all [`ReflectHandle`]s
84/// subscribed to the stream will also terminate after all events yielded by
85/// the root stream have been observed. This means [`ReflectHandle`] streams
86/// can still be polled after the root stream has been dropped.
87///
88/// [`Writer`]: crate::reflector::Writer
89#[pin_project]
90pub struct ReflectHandle<K>
91where
92    K: Lookup + Clone + 'static,
93    K::DynamicType: Eq + std::hash::Hash + Clone,
94{
95    #[pin]
96    rx: Receiver<ObjectRef<K>>,
97    reader: Store<K>,
98}
99
100impl<K> Clone for ReflectHandle<K>
101where
102    K: Lookup + Clone + 'static,
103    K::DynamicType: Eq + std::hash::Hash + Clone,
104{
105    fn clone(&self) -> Self {
106        ReflectHandle::new(self.reader.clone(), self.rx.clone())
107    }
108}
109
110impl<K> ReflectHandle<K>
111where
112    K: Lookup + Clone,
113    K::DynamicType: Eq + std::hash::Hash + Clone,
114{
115    pub(super) fn new(reader: Store<K>, rx: Receiver<ObjectRef<K>>) -> ReflectHandle<K> {
116        Self { rx, reader }
117    }
118
119    #[must_use]
120    pub fn reader(&self) -> Store<K> {
121        self.reader.clone()
122    }
123}
124
125impl<K> Stream for ReflectHandle<K>
126where
127    K: Lookup + Clone,
128    K::DynamicType: Eq + std::hash::Hash + Clone + Default,
129{
130    type Item = Arc<K>;
131
132    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133        let mut this = self.project();
134        match ready!(this.rx.as_mut().poll_next(cx)) {
135            Some(obj_ref) => this
136                .reader
137                .get(&obj_ref)
138                .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))),
139            None => Poll::Ready(None),
140        }
141    }
142}
143
144#[cfg(feature = "unstable-runtime-subscribe")]
145#[cfg(test)]
146pub(crate) mod test {
147    use crate::{
148        watcher::{Error, Event},
149        WatchStreamExt,
150    };
151    use std::{pin::pin, sync::Arc, task::Poll};
152
153    use crate::reflector;
154    use futures::{poll, stream, StreamExt};
155    use k8s_openapi::api::core::v1::Pod;
156
157    fn testpod(name: &str) -> Pod {
158        let mut pod = Pod::default();
159        pod.metadata.name = Some(name.to_string());
160        pod
161    }
162
163    #[tokio::test]
164    async fn events_are_passed_through() {
165        let foo = testpod("foo");
166        let bar = testpod("bar");
167        let st = stream::iter([
168            Ok(Event::Apply(foo.clone())),
169            Err(Error::NoResourceVersion),
170            Ok(Event::Init),
171            Ok(Event::InitApply(foo)),
172            Ok(Event::InitApply(bar)),
173            Ok(Event::InitDone),
174        ]);
175
176        let (reader, writer) = reflector::store_shared(10);
177        let mut reflect = pin!(st.reflect_shared(writer));
178
179        // Prior to any polls, we should have an empty store.
180        assert_eq!(reader.len(), 0);
181        assert!(matches!(
182            poll!(reflect.next()),
183            Poll::Ready(Some(Ok(Event::Apply(_))))
184        ));
185
186        // Make progress and assert all events are seen
187        assert_eq!(reader.len(), 1);
188        assert!(matches!(
189            poll!(reflect.next()),
190            Poll::Ready(Some(Err(Error::NoResourceVersion)))
191        ));
192        assert_eq!(reader.len(), 1);
193
194        let restarted = poll!(reflect.next());
195        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Init)))));
196        assert_eq!(reader.len(), 1);
197
198        let restarted = poll!(reflect.next());
199        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
200        assert_eq!(reader.len(), 1);
201
202        let restarted = poll!(reflect.next());
203        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
204        assert_eq!(reader.len(), 1);
205
206        let restarted = poll!(reflect.next());
207        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone)))));
208        assert_eq!(reader.len(), 2);
209
210        assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
211        assert_eq!(reader.len(), 2);
212    }
213
214    #[tokio::test]
215    async fn readers_yield_touched_objects() {
216        // Readers should yield touched objects they receive from Stream events.
217        //
218        // NOTE: a Delete(_) event will be ignored if the item does not exist in
219        // the cache. Same with a Restarted(vec![delete_item])
220        let foo = testpod("foo");
221        let bar = testpod("bar");
222        let st = stream::iter([
223            Ok(Event::Delete(foo.clone())),
224            Ok(Event::Apply(foo.clone())),
225            Err(Error::NoResourceVersion),
226            Ok(Event::Init),
227            Ok(Event::InitApply(foo.clone())),
228            Ok(Event::InitApply(bar.clone())),
229            Ok(Event::InitDone),
230        ]);
231
232        let foo = Arc::new(foo);
233        let _bar = Arc::new(bar);
234
235        let (_, writer) = reflector::store_shared(10);
236        let mut subscriber = pin!(writer.subscribe().unwrap());
237        let mut reflect = pin!(st.reflect_shared(writer));
238
239        // Deleted events should be skipped by subscriber.
240        assert!(matches!(
241            poll!(reflect.next()),
242            Poll::Ready(Some(Ok(Event::Delete(_))))
243        ));
244        assert_eq!(poll!(subscriber.next()), Poll::Pending);
245
246        assert!(matches!(
247            poll!(reflect.next()),
248            Poll::Ready(Some(Ok(Event::Apply(_))))
249        ));
250        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
251
252        // Errors are not propagated to subscribers.
253        assert!(matches!(
254            poll!(reflect.next()),
255            Poll::Ready(Some(Err(Error::NoResourceVersion)))
256        ));
257        assert!(matches!(poll!(subscriber.next()), Poll::Pending));
258
259        // Restart event will yield all objects in the list
260
261        assert!(matches!(
262            poll!(reflect.next()),
263            Poll::Ready(Some(Ok(Event::Init)))
264        ));
265
266        assert!(matches!(
267            poll!(reflect.next()),
268            Poll::Ready(Some(Ok(Event::InitApply(_))))
269        ));
270        assert!(matches!(
271            poll!(reflect.next()),
272            Poll::Ready(Some(Ok(Event::InitApply(_))))
273        ));
274
275        assert!(matches!(
276            poll!(reflect.next()),
277            Poll::Ready(Some(Ok(Event::InitDone)))
278        ));
279
280        // these don't come back in order atm:
281        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
282        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
283
284        // When main channel is closed, it is propagated to subscribers
285        assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
286        assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
287    }
288
289    #[tokio::test]
290    async fn readers_yield_when_tx_drops() {
291        // Once the main stream is dropped, readers should continue to make
292        // progress and read values that have been sent on the channel.
293        let foo = testpod("foo");
294        let bar = testpod("bar");
295        let st = stream::iter([
296            Ok(Event::Apply(foo.clone())),
297            Ok(Event::Init),
298            Ok(Event::InitApply(foo.clone())),
299            Ok(Event::InitApply(bar.clone())),
300            Ok(Event::InitDone),
301        ]);
302
303        let foo = Arc::new(foo);
304        let _bar = Arc::new(bar);
305
306        let (_, writer) = reflector::store_shared(10);
307        let mut subscriber = pin!(writer.subscribe().unwrap());
308        let mut reflect = Box::pin(st.reflect_shared(writer));
309
310        assert!(matches!(
311            poll!(reflect.next()),
312            Poll::Ready(Some(Ok(Event::Apply(_))))
313        ));
314        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
315
316        // Restart event will yield all objects in the list. Broadcast values
317        // without polling and then drop.
318        //
319        // First, subscribers should be pending.
320        assert_eq!(poll!(subscriber.next()), Poll::Pending);
321
322        assert!(matches!(
323            poll!(reflect.next()),
324            Poll::Ready(Some(Ok(Event::Init)))
325        ));
326        assert_eq!(poll!(subscriber.next()), Poll::Pending);
327
328        assert!(matches!(
329            poll!(reflect.next()),
330            Poll::Ready(Some(Ok(Event::InitApply(_))))
331        ));
332        assert_eq!(poll!(subscriber.next()), Poll::Pending);
333
334        assert!(matches!(
335            poll!(reflect.next()),
336            Poll::Ready(Some(Ok(Event::InitApply(_))))
337        ));
338        assert_eq!(poll!(subscriber.next()), Poll::Pending);
339
340        assert!(matches!(
341            poll!(reflect.next()),
342            Poll::Ready(Some(Ok(Event::InitDone)))
343        ));
344        drop(reflect);
345
346        // we will get foo and bar here, but we dont have a guaranteed ordering on page events
347        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
348        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
349        assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
350    }
351
352    #[tokio::test]
353    async fn reflect_applies_backpressure() {
354        // When the channel is full, we should observe backpressure applied.
355        //
356        // This will be manifested by receiving Poll::Pending on the reflector
357        // stream while the reader stream is not polled. Once we unblock the
358        // buffer, the reflector will make progress.
359        let foo = testpod("foo");
360        let bar = testpod("bar");
361        let st = stream::iter([
362            //TODO: include a ready event here to avoid dealing with Init?
363            Ok(Event::Apply(foo.clone())),
364            Ok(Event::Apply(bar.clone())),
365            Ok(Event::Apply(foo.clone())),
366        ]);
367
368        let foo = Arc::new(foo);
369        let bar = Arc::new(bar);
370
371        let (_, writer) = reflector::store_shared(1);
372        let mut subscriber = pin!(writer.subscribe().unwrap());
373        let mut subscriber_slow = pin!(writer.subscribe().unwrap());
374        let mut reflect = pin!(st.reflect_shared(writer));
375
376        assert_eq!(poll!(subscriber.next()), Poll::Pending);
377        assert_eq!(poll!(subscriber_slow.next()), Poll::Pending);
378
379        // Poll first subscriber, but not the second.
380        //
381        // The buffer can hold one object value, so even if we have a slow subscriber,
382        // we will still get an event from the root.
383        assert!(matches!(
384            poll!(reflect.next()),
385            Poll::Ready(Some(Ok(Event::Apply(_))))
386        ));
387        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
388
389        // One subscriber is not reading, so we need to apply backpressure until
390        // channel has capacity.
391        //
392        // At this point, the buffer is full. Polling again will trigger the
393        // backpressure logic.
394        assert!(matches!(poll!(reflect.next()), Poll::Pending));
395
396        // Our "fast" subscriber will also have nothing else to poll until the
397        // slower subscriber advances its pointer in the buffer.
398        assert_eq!(poll!(subscriber.next()), Poll::Pending);
399
400        // Advance slow reader
401        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
402
403        // We now have room for only one more item. In total, the previous event
404        // had two. We repeat the same pattern.
405        assert!(matches!(
406            poll!(reflect.next()),
407            Poll::Ready(Some(Ok(Event::Apply(_))))
408        ));
409        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
410        assert!(matches!(poll!(reflect.next()), Poll::Pending));
411        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
412        assert!(matches!(
413            poll!(reflect.next()),
414            Poll::Ready(Some(Ok(Event::Apply(_))))
415        ));
416        // Poll again to drain the queue.
417        assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
418        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
419        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
420
421        assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
422        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None));
423    }
424
425    // TODO (matei): tests around cloning subscribers once a watch stream has already
426    // been established. This will depend on the interfaces & impl so are left
427    // out for now.
428}