igd_next/aio/
async_std.rs

1//! Async-std abstraction for the aio [`Gateway`].
2
3use std::collections::HashMap;
4use std::net::SocketAddr;
5
6use async_std::{future::timeout, net::UdpSocket};
7use async_trait::async_trait;
8use futures::prelude::*;
9use log::debug;
10
11use super::{Provider, HEADER_NAME, MAX_RESPONSE_SIZE};
12use crate::aio::Gateway;
13use crate::common::options::{DEFAULT_TIMEOUT, RESPONSE_TIMEOUT};
14use crate::common::{messages, parsing, SearchOptions};
15use crate::errors::SearchError;
16use crate::RequestError;
17
18/// Async-std provider for the [`Gateway`].
19#[derive(Debug, Clone)]
20pub struct AsyncStd;
21
22#[async_trait]
23impl Provider for AsyncStd {
24    async fn send_async(url: &str, action: &str, body: &str) -> Result<String, RequestError> {
25        Ok(surf::post(url)
26            .header(HEADER_NAME, action)
27            .content_type("text/xml")
28            .body(body)
29            .recv_string()
30            .await?)
31    }
32}
33
34/// Search for a gateway with the provided options.
35pub async fn search_gateway(options: SearchOptions) -> Result<Gateway<AsyncStd>, SearchError> {
36    let search_timeout = options.timeout.unwrap_or(DEFAULT_TIMEOUT);
37    match timeout(search_timeout, search_gateway_inner(options)).await {
38        Ok(Ok(gateway)) => Ok(gateway),
39        Ok(Err(err)) => Err(err),
40        Err(_err) => {
41            // Timeout
42            Err(SearchError::NoResponseWithinTimeout)
43        }
44    }
45}
46async fn search_gateway_inner(options: SearchOptions) -> Result<Gateway<AsyncStd>, SearchError> {
47    // Create socket for future calls.
48    let mut socket = UdpSocket::bind(&options.bind_addr).await?;
49
50    send_search_request(&mut socket, options.broadcast_address).await?;
51
52    let response_timeout = options.single_search_timeout.unwrap_or(RESPONSE_TIMEOUT);
53
54    loop {
55        let search_response = receive_search_response(&mut socket);
56
57        // Receive search response
58        let (response_body, from) = match timeout(response_timeout, search_response).await {
59            Ok(Ok(v)) => v,
60            Ok(Err(err)) => {
61                debug!("error while receiving broadcast response: {err}");
62                continue;
63            }
64            Err(_) => {
65                debug!("timeout while receiving broadcast response");
66                continue;
67            }
68        };
69
70        let (addr, root_url) = match handle_broadcast_resp(&from, &response_body) {
71            Ok(v) => v,
72            Err(e) => {
73                debug!("error handling broadcast response: {}", e);
74                continue;
75            }
76        };
77
78        let (control_schema_url, control_url) = match get_control_urls(&addr, &root_url).await {
79            Ok(v) => v,
80            Err(e) => {
81                debug!("error getting control URLs: {}", e);
82                continue;
83            }
84        };
85
86        let control_schema = match get_control_schemas(&addr, &control_schema_url).await {
87            Ok(v) => v,
88            Err(e) => {
89                debug!("error getting control schemas: {}", e);
90                continue;
91            }
92        };
93
94        return Ok(Gateway {
95            addr,
96            root_url,
97            control_url,
98            control_schema_url,
99            control_schema,
100            provider: AsyncStd,
101        });
102    }
103}
104
105// Create a new search.
106async fn send_search_request(socket: &mut UdpSocket, addr: SocketAddr) -> Result<(), SearchError> {
107    debug!(
108        "sending broadcast request to: {} on interface: {:?}",
109        addr,
110        socket.local_addr()
111    );
112    socket
113        .send_to(messages::SEARCH_REQUEST.as_bytes(), &addr)
114        .map_ok(|_| ())
115        .map_err(SearchError::from)
116        .await
117}
118
119async fn receive_search_response(socket: &mut UdpSocket) -> Result<(Vec<u8>, SocketAddr), SearchError> {
120    let mut buff = [0u8; MAX_RESPONSE_SIZE];
121    let (n, from) = socket.recv_from(&mut buff).map_err(SearchError::from).await?;
122    debug!("received broadcast response from: {}", from);
123    Ok((buff[..n].to_vec(), from))
124}
125
126// Handle a UDP response message.
127fn handle_broadcast_resp(from: &SocketAddr, data: &[u8]) -> Result<(SocketAddr, String), SearchError> {
128    debug!("handling broadcast response from: {}", from);
129
130    // Convert response to text.
131    let text = std::str::from_utf8(data).map_err(SearchError::from)?;
132
133    // Parse socket address and path.
134    let (addr, root_url) = parsing::parse_search_result(text)?;
135
136    Ok((addr, root_url))
137}
138
139async fn get_control_urls(addr: &SocketAddr, path: &str) -> Result<(String, String), SearchError> {
140    let uri = format!("http://{addr}{path}");
141
142    debug!("requesting control url from: {}", uri);
143
144    let resp = surf::get(uri).recv_bytes().await?;
145
146    debug!("handling control response from: {}", addr);
147    let c = std::io::Cursor::new(&resp);
148    parsing::parse_control_urls(c)
149}
150
151async fn get_control_schemas(
152    addr: &SocketAddr,
153    control_schema_url: &str,
154) -> Result<HashMap<String, Vec<String>>, SearchError> {
155    let uri = format!("http://{addr}{control_schema_url}");
156
157    debug!("requesting control schema from: {uri}");
158    let resp = surf::get(uri).recv_bytes().await?;
159
160    debug!("handling schema response from: {addr}");
161    let c = std::io::Cursor::new(&resp);
162    parsing::parse_schemas(c)
163}