rumqttc_dev_patched/
proxy.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use crate::eventloop::socket_connect;
use crate::framed::AsyncReadWrite;
use crate::NetworkOptions;

use std::io;

#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
use crate::{tls, TlsConfiguration};

#[derive(Clone, Debug)]
pub struct Proxy {
    pub ty: ProxyType,
    pub auth: ProxyAuth,
    pub addr: String,
    pub port: u16,
}

#[derive(Clone, Debug)]
pub enum ProxyType {
    Http,
    #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
    Https(TlsConfiguration),
}

#[derive(Clone, Debug)]
pub enum ProxyAuth {
    None,
    Basic { username: String, password: String },
}

#[derive(Debug, thiserror::Error)]
pub enum ProxyError {
    #[error("Socket connect: {0}.")]
    Io(#[from] io::Error),
    #[error("Proxy connect: {0}.")]
    Proxy(#[from] async_http_proxy::HttpError),

    #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
    #[error("Tls connect: {0}.")]
    Tls(#[from] tls::Error),
}

impl Proxy {
    pub(crate) async fn connect(
        self,
        broker_addr: &str,
        broker_port: u16,
        network_options: NetworkOptions,
    ) -> Result<Box<dyn AsyncReadWrite>, ProxyError> {
        let proxy_addr = format!("{}:{}", self.addr, self.port);

        let tcp: Box<dyn AsyncReadWrite> =
            Box::new(socket_connect(proxy_addr, network_options).await?);
        let mut tcp = match self.ty {
            ProxyType::Http => tcp,
            #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
            ProxyType::Https(tls_config) => {
                tls::tls_connect(&self.addr, self.port, &tls_config, tcp).await?
            }
        };
        self.auth.auth(broker_addr, broker_port, &mut tcp).await?;
        Ok(tcp)
    }
}

impl ProxyAuth {
    async fn auth(
        self,
        host: &str,
        port: u16,
        tcp_stream: &mut Box<dyn AsyncReadWrite>,
    ) -> Result<(), ProxyError> {
        match self {
            Self::None => async_http_proxy::http_connect_tokio(tcp_stream, host, port).await?,
            Self::Basic { username, password } => {
                async_http_proxy::http_connect_tokio_with_basic_auth(
                    tcp_stream, host, port, &username, &password,
                )
                .await?
            }
        }
        Ok(())
    }
}