gloo_worker/oneshot/
bridge.rs1use futures::stream::StreamExt;
2use pinned::mpsc;
3use pinned::mpsc::UnboundedReceiver;
4
5use super::traits::Oneshot;
6use super::worker::OneshotWorker;
7use crate::actor::{WorkerBridge, WorkerSpawner};
8use crate::codec::Codec;
9
10#[derive(Debug)]
12pub struct OneshotBridge<N>
13where
14 N: Oneshot + 'static,
15{
16 inner: WorkerBridge<OneshotWorker<N>>,
17 rx: UnboundedReceiver<N::Output>,
18}
19
20impl<N> OneshotBridge<N>
21where
22 N: Oneshot + 'static,
23{
24 #[inline(always)]
25 pub(crate) fn new(
26 inner: WorkerBridge<OneshotWorker<N>>,
27 rx: UnboundedReceiver<N::Output>,
28 ) -> Self {
29 Self { inner, rx }
30 }
31
32 #[inline(always)]
33 pub(crate) fn register_callback<CODEC>(
34 spawner: &mut WorkerSpawner<OneshotWorker<N>, CODEC>,
35 ) -> UnboundedReceiver<N::Output>
36 where
37 CODEC: Codec,
38 {
39 let (tx, rx) = mpsc::unbounded();
40 spawner.callback(move |output| {
41 let _ = tx.send_now(output);
42 });
43
44 rx
45 }
46
47 pub fn fork(&self) -> Self {
51 let (tx, rx) = mpsc::unbounded();
52 let inner = self.inner.fork(Some(move |output| {
53 let _ = tx.send_now(output);
54 }));
55
56 Self { inner, rx }
57 }
58
59 pub async fn run(&mut self, input: N::Input) -> N::Output {
61 self.inner.send(input);
64
65 self.rx
68 .next()
69 .await
70 .expect("failed to receive result from worker")
71 }
72}