norgopolis_module/
invoker_service.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
use std::pin::Pin;

use crate::module_communication::{invoker_server::Invoker, Invocation, MessagePack};
use futures::Stream;
use tonic::{Request, Response, Status};

#[crate::async_trait]
pub trait Service {
    type Stream: Stream<Item = Result<MessagePack, Status>> + Send;

    async fn call(
        &self,
        fn_name: String,
        args: Option<MessagePack>,
    ) -> Result<Self::Stream, Status>;
}

pub struct InvokerService<T> {
    service: T,
    tx: tokio::sync::mpsc::UnboundedSender<()>,
}

impl<T> InvokerService<T>
where
    T: Service,
{
    pub fn new(service: T, tx: tokio::sync::mpsc::UnboundedSender<()>) -> InvokerService<T> {
        InvokerService { service, tx }
    }
}

#[tonic::async_trait]
impl<T> Invoker for InvokerService<T>
where
    T: Service + Sync + Send + 'static,
{
    type InvokeStream = Pin<Box<dyn Stream<Item = Result<MessagePack, Status>> + Send>>;

    async fn invoke(
        &self,
        request: Request<Invocation>,
    ) -> Result<Response<Self::InvokeStream>, Status> {
        let invocation = request.into_inner();

        let _ = self.tx.send(());

        let response = self
            .service
            .call(invocation.function_name, invocation.args)
            .await?;

        Ok(Response::new(Box::pin(response)))
    }
}