1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
use futures::stream::StreamExt;
use pinned::mpsc;
use pinned::mpsc::UnboundedReceiver;

use super::traits::Oneshot;
use super::worker::OneshotWorker;
use crate::actor::{WorkerBridge, WorkerSpawner};
use crate::codec::Codec;

/// A connection manager for components interaction with oneshot workers.
#[derive(Debug)]
pub struct OneshotBridge<N>
where
    N: Oneshot + 'static,
{
    inner: WorkerBridge<OneshotWorker<N>>,
    rx: UnboundedReceiver<N::Output>,
}

impl<N> OneshotBridge<N>
where
    N: Oneshot + 'static,
{
    #[inline(always)]
    pub(crate) fn new(
        inner: WorkerBridge<OneshotWorker<N>>,
        rx: UnboundedReceiver<N::Output>,
    ) -> Self {
        Self { inner, rx }
    }

    #[inline(always)]
    pub(crate) fn register_callback<CODEC>(
        spawner: &mut WorkerSpawner<OneshotWorker<N>, CODEC>,
    ) -> UnboundedReceiver<N::Output>
    where
        CODEC: Codec,
    {
        let (tx, rx) = mpsc::unbounded();
        spawner.callback(move |output| {
            let _ = tx.send_now(output);
        });

        rx
    }

    /// Forks the bridge.
    ///
    /// This method creates a new bridge that can be used to execute tasks on the same worker instance.
    pub fn fork(&self) -> Self {
        let (tx, rx) = mpsc::unbounded();
        let inner = self.inner.fork(Some(move |output| {
            let _ = tx.send_now(output);
        }));

        Self { inner, rx }
    }

    /// Run the the current oneshot worker once in the current worker instance.
    pub async fn run(&mut self, input: N::Input) -> N::Output {
        // &mut self guarantees that the bridge will be
        // exclusively borrowed during the time the oneshot worker is running.
        self.inner.send(input);

        // For each bridge, there can only be 1 active task running on the worker instance.
        // The next output will be the output for the input that we just sent.
        self.rx
            .next()
            .await
            .expect("failed to receive result from worker")
    }
}