1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use std::fmt;
use anyhow::Result;
use async_stream::stream;
use futures::{Stream, StreamExt};
use iroh_rpc_types::{gateway::*, VersionRequest, WatchRequest};
use crate::{StatusType, HEALTH_POLL_WAIT};
#[derive(Clone)]
pub struct GatewayClient {
client: quic_rpc::RpcClient<GatewayService, crate::ChannelTypes>,
}
impl fmt::Debug for GatewayClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GatewayClient2")
.field("client", &self.client)
.finish()
}
}
impl GatewayClient {
pub async fn new(addr: GatewayAddr) -> anyhow::Result<Self> {
let client = crate::open_client::<GatewayService>(addr).await?;
Ok(Self { client })
}
#[tracing::instrument(skip(self))]
pub async fn version(&self) -> Result<String> {
let res = self.client.rpc(VersionRequest).await?;
Ok(res.version)
}
#[tracing::instrument(skip(self))]
pub async fn check(&self) -> (StatusType, String) {
match self.version().await {
Ok(version) => (StatusType::Serving, version),
Err(_) => (StatusType::Down, String::new()),
}
}
#[tracing::instrument(skip(self))]
pub async fn watch(&self) -> impl Stream<Item = (StatusType, String)> {
let client = self.client.clone();
stream! {
loop {
let res = client.server_streaming(WatchRequest).await;
if let Ok(mut res) = res {
while let Some(Ok(version)) = res.next().await {
yield (StatusType::Serving, version.version);
}
}
yield (StatusType::Down, String::new());
tokio::time::sleep(HEALTH_POLL_WAIT).await;
}
}
}
}