wasmer_backend_api/
subscription.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use crate::{
    types::{PackageVersionReadySubscription, PackageVersionReadySubscriptionVariables},
    WasmerClient,
};
use anyhow::Context;
use async_tungstenite::tungstenite::client::IntoClientRequest;
use cynic::SubscriptionBuilder;
use graphql_ws_client::Subscription;
use reqwest::header::HeaderValue;
use std::future::IntoFuture;

pub async fn package_version_ready(
    client: &WasmerClient,
    package_version_id: &str,
) -> anyhow::Result<
    Subscription<
        cynic::StreamingOperation<
            PackageVersionReadySubscription,
            PackageVersionReadySubscriptionVariables,
        >,
    >,
> {
    let mut url = client.graphql_endpoint().clone();
    if url.scheme() == "http" {
        url.set_scheme("ws").unwrap();
    } else if url.scheme() == "https" {
        url.set_scheme("wss").unwrap();
    }

    let url = url.to_string();
    let mut req = url.into_client_request()?;

    req.headers_mut().insert(
        "Sec-WebSocket-Protocol",
        HeaderValue::from_str("graphql-transport-ws").unwrap(),
    );

    if let Some(token) = client.auth_token() {
        req.headers_mut().insert(
            reqwest::header::AUTHORIZATION,
            HeaderValue::from_str(&format!("Bearer {}", token))?,
        );
    }

    req.headers_mut()
        .insert(reqwest::header::USER_AGENT, client.user_agent.clone());

    let (connection, _resp) = async_tungstenite::tokio::connect_async(req)
        .await
        .context("could not connect")?;

    let (client, actor) = graphql_ws_client::Client::build(connection).await?;
    tokio::spawn(actor.into_future());

    let stream = client
        .subscribe(PackageVersionReadySubscription::build(
            PackageVersionReadySubscriptionVariables {
                package_version_id: cynic::Id::new(package_version_id),
            },
        ))
        .await?;

    Ok(stream)
}