1use core::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5use std::{fmt::Debug, sync::Arc};
6
7use educe::Educe;
8use futures::Stream;
9use pin_project::pin_project;
10use std::task::ready;
11
12use crate::reflector::{ObjectRef, Store};
13use async_broadcast::{InactiveReceiver, Receiver, Sender};
14
15use super::Lookup;
16
17#[derive(Educe)]
18#[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)]
19pub(crate) struct Dispatcher<K>
22where
23 K: Lookup + Clone + 'static,
24 K::DynamicType: Eq + std::hash::Hash + Clone,
25{
26 dispatch_tx: Sender<ObjectRef<K>>,
27 _dispatch_rx: InactiveReceiver<ObjectRef<K>>,
30}
31
32impl<K> Dispatcher<K>
33where
34 K: Lookup + Clone + 'static,
35 K::DynamicType: Eq + std::hash::Hash + Clone,
36{
37 pub(crate) fn new(buf_size: usize) -> Dispatcher<K> {
48 let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size);
50 dispatch_tx.set_await_active(false);
54 Self {
55 dispatch_tx,
56 _dispatch_rx: dispatch_rx.deactivate(),
57 }
58 }
59
60 pub(crate) async fn broadcast(&mut self, obj_ref: ObjectRef<K>) {
63 let _ = self.dispatch_tx.broadcast_direct(obj_ref).await;
64 }
65
66 pub(crate) fn subscribe(&self, reader: Store<K>) -> ReflectHandle<K> {
71 ReflectHandle::new(reader, self.dispatch_tx.new_receiver())
72 }
73}
74
75#[pin_project]
90pub struct ReflectHandle<K>
91where
92 K: Lookup + Clone + 'static,
93 K::DynamicType: Eq + std::hash::Hash + Clone,
94{
95 #[pin]
96 rx: Receiver<ObjectRef<K>>,
97 reader: Store<K>,
98}
99
100impl<K> Clone for ReflectHandle<K>
101where
102 K: Lookup + Clone + 'static,
103 K::DynamicType: Eq + std::hash::Hash + Clone,
104{
105 fn clone(&self) -> Self {
106 ReflectHandle::new(self.reader.clone(), self.rx.clone())
107 }
108}
109
110impl<K> ReflectHandle<K>
111where
112 K: Lookup + Clone,
113 K::DynamicType: Eq + std::hash::Hash + Clone,
114{
115 pub(super) fn new(reader: Store<K>, rx: Receiver<ObjectRef<K>>) -> ReflectHandle<K> {
116 Self { rx, reader }
117 }
118
119 #[must_use]
120 pub fn reader(&self) -> Store<K> {
121 self.reader.clone()
122 }
123}
124
125impl<K> Stream for ReflectHandle<K>
126where
127 K: Lookup + Clone,
128 K::DynamicType: Eq + std::hash::Hash + Clone + Default,
129{
130 type Item = Arc<K>;
131
132 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133 let mut this = self.project();
134 match ready!(this.rx.as_mut().poll_next(cx)) {
135 Some(obj_ref) => this
136 .reader
137 .get(&obj_ref)
138 .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))),
139 None => Poll::Ready(None),
140 }
141 }
142}
143
144#[cfg(feature = "unstable-runtime-subscribe")]
145#[cfg(test)]
146pub(crate) mod test {
147 use crate::{
148 watcher::{Error, Event},
149 WatchStreamExt,
150 };
151 use std::{pin::pin, sync::Arc, task::Poll};
152
153 use crate::reflector;
154 use futures::{poll, stream, StreamExt};
155 use k8s_openapi::api::core::v1::Pod;
156
157 fn testpod(name: &str) -> Pod {
158 let mut pod = Pod::default();
159 pod.metadata.name = Some(name.to_string());
160 pod
161 }
162
163 #[tokio::test]
164 async fn events_are_passed_through() {
165 let foo = testpod("foo");
166 let bar = testpod("bar");
167 let st = stream::iter([
168 Ok(Event::Apply(foo.clone())),
169 Err(Error::NoResourceVersion),
170 Ok(Event::Init),
171 Ok(Event::InitApply(foo)),
172 Ok(Event::InitApply(bar)),
173 Ok(Event::InitDone),
174 ]);
175
176 let (reader, writer) = reflector::store_shared(10);
177 let mut reflect = pin!(st.reflect_shared(writer));
178
179 assert_eq!(reader.len(), 0);
181 assert!(matches!(
182 poll!(reflect.next()),
183 Poll::Ready(Some(Ok(Event::Apply(_))))
184 ));
185
186 assert_eq!(reader.len(), 1);
188 assert!(matches!(
189 poll!(reflect.next()),
190 Poll::Ready(Some(Err(Error::NoResourceVersion)))
191 ));
192 assert_eq!(reader.len(), 1);
193
194 let restarted = poll!(reflect.next());
195 assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Init)))));
196 assert_eq!(reader.len(), 1);
197
198 let restarted = poll!(reflect.next());
199 assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
200 assert_eq!(reader.len(), 1);
201
202 let restarted = poll!(reflect.next());
203 assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
204 assert_eq!(reader.len(), 1);
205
206 let restarted = poll!(reflect.next());
207 assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone)))));
208 assert_eq!(reader.len(), 2);
209
210 assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
211 assert_eq!(reader.len(), 2);
212 }
213
214 #[tokio::test]
215 async fn readers_yield_touched_objects() {
216 let foo = testpod("foo");
221 let bar = testpod("bar");
222 let st = stream::iter([
223 Ok(Event::Delete(foo.clone())),
224 Ok(Event::Apply(foo.clone())),
225 Err(Error::NoResourceVersion),
226 Ok(Event::Init),
227 Ok(Event::InitApply(foo.clone())),
228 Ok(Event::InitApply(bar.clone())),
229 Ok(Event::InitDone),
230 ]);
231
232 let foo = Arc::new(foo);
233 let _bar = Arc::new(bar);
234
235 let (_, writer) = reflector::store_shared(10);
236 let mut subscriber = pin!(writer.subscribe().unwrap());
237 let mut reflect = pin!(st.reflect_shared(writer));
238
239 assert!(matches!(
241 poll!(reflect.next()),
242 Poll::Ready(Some(Ok(Event::Delete(_))))
243 ));
244 assert_eq!(poll!(subscriber.next()), Poll::Pending);
245
246 assert!(matches!(
247 poll!(reflect.next()),
248 Poll::Ready(Some(Ok(Event::Apply(_))))
249 ));
250 assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
251
252 assert!(matches!(
254 poll!(reflect.next()),
255 Poll::Ready(Some(Err(Error::NoResourceVersion)))
256 ));
257 assert!(matches!(poll!(subscriber.next()), Poll::Pending));
258
259 assert!(matches!(
262 poll!(reflect.next()),
263 Poll::Ready(Some(Ok(Event::Init)))
264 ));
265
266 assert!(matches!(
267 poll!(reflect.next()),
268 Poll::Ready(Some(Ok(Event::InitApply(_))))
269 ));
270 assert!(matches!(
271 poll!(reflect.next()),
272 Poll::Ready(Some(Ok(Event::InitApply(_))))
273 ));
274
275 assert!(matches!(
276 poll!(reflect.next()),
277 Poll::Ready(Some(Ok(Event::InitDone)))
278 ));
279
280 assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
282 assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
283
284 assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
286 assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
287 }
288
289 #[tokio::test]
290 async fn readers_yield_when_tx_drops() {
291 let foo = testpod("foo");
294 let bar = testpod("bar");
295 let st = stream::iter([
296 Ok(Event::Apply(foo.clone())),
297 Ok(Event::Init),
298 Ok(Event::InitApply(foo.clone())),
299 Ok(Event::InitApply(bar.clone())),
300 Ok(Event::InitDone),
301 ]);
302
303 let foo = Arc::new(foo);
304 let _bar = Arc::new(bar);
305
306 let (_, writer) = reflector::store_shared(10);
307 let mut subscriber = pin!(writer.subscribe().unwrap());
308 let mut reflect = Box::pin(st.reflect_shared(writer));
309
310 assert!(matches!(
311 poll!(reflect.next()),
312 Poll::Ready(Some(Ok(Event::Apply(_))))
313 ));
314 assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
315
316 assert_eq!(poll!(subscriber.next()), Poll::Pending);
321
322 assert!(matches!(
323 poll!(reflect.next()),
324 Poll::Ready(Some(Ok(Event::Init)))
325 ));
326 assert_eq!(poll!(subscriber.next()), Poll::Pending);
327
328 assert!(matches!(
329 poll!(reflect.next()),
330 Poll::Ready(Some(Ok(Event::InitApply(_))))
331 ));
332 assert_eq!(poll!(subscriber.next()), Poll::Pending);
333
334 assert!(matches!(
335 poll!(reflect.next()),
336 Poll::Ready(Some(Ok(Event::InitApply(_))))
337 ));
338 assert_eq!(poll!(subscriber.next()), Poll::Pending);
339
340 assert!(matches!(
341 poll!(reflect.next()),
342 Poll::Ready(Some(Ok(Event::InitDone)))
343 ));
344 drop(reflect);
345
346 assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
348 assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
349 assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
350 }
351
352 #[tokio::test]
353 async fn reflect_applies_backpressure() {
354 let foo = testpod("foo");
360 let bar = testpod("bar");
361 let st = stream::iter([
362 Ok(Event::Apply(foo.clone())),
364 Ok(Event::Apply(bar.clone())),
365 Ok(Event::Apply(foo.clone())),
366 ]);
367
368 let foo = Arc::new(foo);
369 let bar = Arc::new(bar);
370
371 let (_, writer) = reflector::store_shared(1);
372 let mut subscriber = pin!(writer.subscribe().unwrap());
373 let mut subscriber_slow = pin!(writer.subscribe().unwrap());
374 let mut reflect = pin!(st.reflect_shared(writer));
375
376 assert_eq!(poll!(subscriber.next()), Poll::Pending);
377 assert_eq!(poll!(subscriber_slow.next()), Poll::Pending);
378
379 assert!(matches!(
384 poll!(reflect.next()),
385 Poll::Ready(Some(Ok(Event::Apply(_))))
386 ));
387 assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
388
389 assert!(matches!(poll!(reflect.next()), Poll::Pending));
395
396 assert_eq!(poll!(subscriber.next()), Poll::Pending);
399
400 assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
402
403 assert!(matches!(
406 poll!(reflect.next()),
407 Poll::Ready(Some(Ok(Event::Apply(_))))
408 ));
409 assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
410 assert!(matches!(poll!(reflect.next()), Poll::Pending));
411 assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
412 assert!(matches!(
413 poll!(reflect.next()),
414 Poll::Ready(Some(Ok(Event::Apply(_))))
415 ));
416 assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
418 assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
419 assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
420
421 assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
422 assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None));
423 }
424
425 }