1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use actix_rt::net::TcpStream;
use actix_service::{Service, ServiceFactory};
use actix_utils::future::{ok, Ready};
use futures_core::ready;

use super::{
    error::ConnectError,
    resolver::{Resolver, ResolverService},
    tcp::{TcpConnector, TcpConnectorService},
    ConnectInfo, Connection, Host,
};

/// Combined resolver and TCP connector service factory.
///
/// Used to create [`ConnectorService`]s which receive connection information, resolve DNS if
/// required, and return a TCP stream.
#[derive(Clone, Default)]
pub struct Connector {
    resolver: Resolver,
}

impl Connector {
    /// Constructs new connector factory with the given resolver.
    pub fn new(resolver: Resolver) -> Self {
        Connector { resolver }
    }

    /// Build connector service.
    pub fn service(&self) -> ConnectorService {
        ConnectorService {
            tcp: TcpConnector::default().service(),
            resolver: self.resolver.service(),
        }
    }
}

impl<R: Host> ServiceFactory<ConnectInfo<R>> for Connector {
    type Response = Connection<R, TcpStream>;
    type Error = ConnectError;
    type Config = ();
    type Service = ConnectorService;
    type InitError = ();
    type Future = Ready<Result<Self::Service, Self::InitError>>;

    fn new_service(&self, _: ()) -> Self::Future {
        ok(self.service())
    }
}

/// Combined resolver and TCP connector service.
///
/// Service implementation receives connection information, resolves DNS if required, and returns
/// a TCP stream.
#[derive(Clone, Default)]
pub struct ConnectorService {
    tcp: TcpConnectorService,
    resolver: ResolverService,
}

impl<R: Host> Service<ConnectInfo<R>> for ConnectorService {
    type Response = Connection<R, TcpStream>;
    type Error = ConnectError;
    type Future = ConnectServiceResponse<R>;

    actix_service::always_ready!();

    fn call(&self, req: ConnectInfo<R>) -> Self::Future {
        ConnectServiceResponse {
            fut: ConnectFut::Resolve(self.resolver.call(req)),
            tcp: self.tcp,
        }
    }
}

/// Chains futures of resolve and connect steps.
pub(crate) enum ConnectFut<R: Host> {
    Resolve(<ResolverService as Service<ConnectInfo<R>>>::Future),
    Connect(<TcpConnectorService as Service<ConnectInfo<R>>>::Future),
}

/// Container for the intermediate states of [`ConnectFut`].
pub(crate) enum ConnectFutState<R: Host> {
    Resolved(ConnectInfo<R>),
    Connected(Connection<R, TcpStream>),
}

impl<R: Host> ConnectFut<R> {
    fn poll_connect(
        &mut self,
        cx: &mut Context<'_>,
    ) -> Poll<Result<ConnectFutState<R>, ConnectError>> {
        match self {
            ConnectFut::Resolve(ref mut fut) => {
                Pin::new(fut).poll(cx).map_ok(ConnectFutState::Resolved)
            }

            ConnectFut::Connect(ref mut fut) => {
                Pin::new(fut).poll(cx).map_ok(ConnectFutState::Connected)
            }
        }
    }
}

pub struct ConnectServiceResponse<R: Host> {
    fut: ConnectFut<R>,
    tcp: TcpConnectorService,
}

impl<R: Host> Future for ConnectServiceResponse<R> {
    type Output = Result<Connection<R, TcpStream>, ConnectError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            match ready!(self.fut.poll_connect(cx))? {
                ConnectFutState::Resolved(res) => {
                    self.fut = ConnectFut::Connect(self.tcp.call(res));
                }
                ConnectFutState::Connected(res) => return Poll::Ready(Ok(res)),
            }
        }
    }
}