gloo_worker/oneshot/
bridge.rs

1use futures::stream::StreamExt;
2use pinned::mpsc;
3use pinned::mpsc::UnboundedReceiver;
4
5use super::traits::Oneshot;
6use super::worker::OneshotWorker;
7use crate::actor::{WorkerBridge, WorkerSpawner};
8use crate::codec::Codec;
9
10/// A connection manager for components interaction with oneshot workers.
11#[derive(Debug)]
12pub struct OneshotBridge<N>
13where
14    N: Oneshot + 'static,
15{
16    inner: WorkerBridge<OneshotWorker<N>>,
17    rx: UnboundedReceiver<N::Output>,
18}
19
20impl<N> OneshotBridge<N>
21where
22    N: Oneshot + 'static,
23{
24    #[inline(always)]
25    pub(crate) fn new(
26        inner: WorkerBridge<OneshotWorker<N>>,
27        rx: UnboundedReceiver<N::Output>,
28    ) -> Self {
29        Self { inner, rx }
30    }
31
32    #[inline(always)]
33    pub(crate) fn register_callback<CODEC>(
34        spawner: &mut WorkerSpawner<OneshotWorker<N>, CODEC>,
35    ) -> UnboundedReceiver<N::Output>
36    where
37        CODEC: Codec,
38    {
39        let (tx, rx) = mpsc::unbounded();
40        spawner.callback(move |output| {
41            let _ = tx.send_now(output);
42        });
43
44        rx
45    }
46
47    /// Forks the bridge.
48    ///
49    /// This method creates a new bridge that can be used to execute tasks on the same worker instance.
50    pub fn fork(&self) -> Self {
51        let (tx, rx) = mpsc::unbounded();
52        let inner = self.inner.fork(Some(move |output| {
53            let _ = tx.send_now(output);
54        }));
55
56        Self { inner, rx }
57    }
58
59    /// Run the the current oneshot worker once in the current worker instance.
60    pub async fn run(&mut self, input: N::Input) -> N::Output {
61        // &mut self guarantees that the bridge will be
62        // exclusively borrowed during the time the oneshot worker is running.
63        self.inner.send(input);
64
65        // For each bridge, there can only be 1 active task running on the worker instance.
66        // The next output will be the output for the input that we just sent.
67        self.rx
68            .next()
69            .await
70            .expect("failed to receive result from worker")
71    }
72}