1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use std::fmt;

use anyhow::Result;
use async_stream::stream;
use futures::{Stream, StreamExt};
use iroh_rpc_types::{gateway::*, VersionRequest, WatchRequest};

use crate::{StatusType, HEALTH_POLL_WAIT};

#[derive(Clone)]
pub struct GatewayClient {
    client: quic_rpc::RpcClient<GatewayService, crate::ChannelTypes>,
}

impl fmt::Debug for GatewayClient {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("GatewayClient2")
            .field("client", &self.client)
            .finish()
    }
}

impl GatewayClient {
    pub async fn new(addr: GatewayAddr) -> anyhow::Result<Self> {
        let client = crate::open_client::<GatewayService>(addr).await?;
        Ok(Self { client })
    }

    #[tracing::instrument(skip(self))]
    pub async fn version(&self) -> Result<String> {
        let res = self.client.rpc(VersionRequest).await?;
        Ok(res.version)
    }

    #[tracing::instrument(skip(self))]
    pub async fn check(&self) -> (StatusType, String) {
        match self.version().await {
            Ok(version) => (StatusType::Serving, version),
            Err(_) => (StatusType::Down, String::new()),
        }
    }

    #[tracing::instrument(skip(self))]
    pub async fn watch(&self) -> impl Stream<Item = (StatusType, String)> {
        let client = self.client.clone();
        stream! {
            loop {
                let res = client.server_streaming(WatchRequest).await;
                if let Ok(mut res) = res {
                    while let Some(Ok(version)) = res.next().await {
                        yield (StatusType::Serving, version.version);
                    }
                }
                yield (StatusType::Down, String::new());
                tokio::time::sleep(HEALTH_POLL_WAIT).await;
            }
        }
    }
}