hickory_proto/multicast/
mdns_client_stream.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright 2015-2018 Benjamin Fry <benjaminfry@me.com>
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// https://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// https://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

use std::fmt::{self, Display};
use std::future::Future;
use std::net::{Ipv4Addr, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_util::future::{FutureExt, TryFutureExt};
use futures_util::stream::{Stream, StreamExt, TryStreamExt};

use crate::error::ProtoError;
use crate::multicast::mdns_stream::{MDNS_IPV4, MDNS_IPV6};
use crate::multicast::{MdnsQueryType, MdnsStream};
use crate::runtime::TokioTime;
use crate::xfer::{DnsClientStream, SerialMessage};
use crate::BufDnsStreamHandle;

/// A UDP client stream of DNS binary packets
#[must_use = "futures do nothing unless polled"]
pub struct MdnsClientStream {
    mdns_stream: MdnsStream,
}

impl MdnsClientStream {
    /// associates the socket to the well-known ipv4 multicast address
    pub fn new_ipv4(
        mdns_query_type: MdnsQueryType,
        packet_ttl: Option<u32>,
        ipv4_if: Option<Ipv4Addr>,
    ) -> (MdnsClientConnect, BufDnsStreamHandle) {
        Self::new(*MDNS_IPV4, mdns_query_type, packet_ttl, ipv4_if, None)
    }

    /// associates the socket to the well-known ipv6 multicast address
    pub fn new_ipv6(
        mdns_query_type: MdnsQueryType,
        packet_ttl: Option<u32>,
        ipv6_if: Option<u32>,
    ) -> (MdnsClientConnect, BufDnsStreamHandle) {
        Self::new(*MDNS_IPV6, mdns_query_type, packet_ttl, None, ipv6_if)
    }

    /// it is expected that the resolver wrapper will be responsible for creating and managing
    ///  new UdpClients such that each new client would have a random port (reduce chance of cache
    ///  poisoning)
    ///
    /// # Return
    ///
    /// a tuple of a Future Stream which will handle sending and receiving messages, and a
    ///  handle which can be used to send messages into the stream.
    #[allow(clippy::new_ret_no_self)]
    pub fn new(
        mdns_addr: SocketAddr,
        mdns_query_type: MdnsQueryType,
        packet_ttl: Option<u32>,
        ipv4_if: Option<Ipv4Addr>,
        ipv6_if: Option<u32>,
    ) -> (MdnsClientConnect, BufDnsStreamHandle) {
        let (stream_future, sender) =
            MdnsStream::new(mdns_addr, mdns_query_type, packet_ttl, ipv4_if, ipv6_if);

        let stream_future = stream_future
            .map_ok(move |mdns_stream| Self { mdns_stream })
            .map_err(ProtoError::from);

        let new_future = Box::new(stream_future);
        let new_future = MdnsClientConnect(new_future);

        (new_future, sender)
    }
}

impl Display for MdnsClientStream {
    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
        write!(formatter, "mDNS({})", self.mdns_stream.multicast_addr())
    }
}

impl DnsClientStream for MdnsClientStream {
    type Time = TokioTime;

    fn name_server_addr(&self) -> SocketAddr {
        self.mdns_stream.multicast_addr()
    }
}

impl Stream for MdnsClientStream {
    type Item = Result<SerialMessage, ProtoError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mdns_stream = &mut self.as_mut().mdns_stream;
        mdns_stream.map_err(ProtoError::from).poll_next_unpin(cx)
        // match ready!(self.mdns_stream.poll_next_unpin(cx).map_err(ProtoError::from)) {
        //     Some(serial_message) => {
        //         // TODO: for mDNS queries could come from anywhere. It's not clear that there is anything
        //         //       we can validate in this case.
        //         Poll::Ready(Some(Ok(serial_message)))
        //     }
        //     None => Poll::Ready(None),
        // }
    }
}

/// A future that resolves to an MdnsClientStream
pub struct MdnsClientConnect(
    Box<dyn Future<Output = Result<MdnsClientStream, ProtoError>> + Send + Unpin>,
);

impl Future for MdnsClientConnect {
    type Output = Result<MdnsClientStream, ProtoError>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.0.as_mut().poll_unpin(cx)
    }
}