hickory_proto/xfer/
mod.rs

1//! DNS high level transit implimentations.
2//!
3//! Primarily there are two types in this module of interest, the `DnsMultiplexer` type and the `DnsHandle` type. `DnsMultiplexer` can be thought of as the state machine responsible for sending and receiving DNS messages. `DnsHandle` is the type given to API users of the `hickory-proto` library to send messages into the `DnsMultiplexer` for delivery. Finally there is the `DnsRequest` type. This allows for customizations, through `DnsRequestOptions`, to the delivery of messages via a `DnsMultiplexer`.
4//!
5//! TODO: this module needs some serious refactoring and normalization.
6
7use std::fmt::{Debug, Display};
8use std::net::SocketAddr;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12use futures_channel::mpsc;
13use futures_channel::oneshot;
14use futures_util::future::Future;
15use futures_util::ready;
16use futures_util::stream::{Fuse, Peekable, Stream, StreamExt};
17use tracing::{debug, warn};
18
19use crate::error::*;
20use crate::Time;
21
22mod dns_exchange;
23pub mod dns_handle;
24pub mod dns_multiplexer;
25pub mod dns_request;
26pub mod dns_response;
27#[cfg(feature = "dnssec")]
28#[cfg_attr(docsrs, doc(cfg(feature = "dnssec")))]
29pub mod dnssec_dns_handle;
30pub mod retry_dns_handle;
31mod serial_message;
32
33pub use self::dns_exchange::{
34    DnsExchange, DnsExchangeBackground, DnsExchangeConnect, DnsExchangeSend,
35};
36pub use self::dns_handle::{DnsHandle, DnsStreamHandle};
37pub use self::dns_multiplexer::{DnsMultiplexer, DnsMultiplexerConnect};
38pub use self::dns_request::{DnsRequest, DnsRequestOptions};
39pub use self::dns_response::{DnsResponse, DnsResponseStream};
40#[cfg(feature = "dnssec")]
41#[cfg_attr(docsrs, doc(cfg(feature = "dnssec")))]
42pub use self::dnssec_dns_handle::DnssecDnsHandle;
43pub use self::retry_dns_handle::RetryDnsHandle;
44pub use self::serial_message::SerialMessage;
45
46/// Ignores the result of a send operation and logs and ignores errors
47fn ignore_send<M, T>(result: Result<M, mpsc::TrySendError<T>>) {
48    if let Err(error) = result {
49        if error.is_disconnected() {
50            debug!("ignoring send error on disconnected stream");
51            return;
52        }
53
54        warn!("error notifying wait, possible future leak: {:?}", error);
55    }
56}
57
58/// A non-multiplexed stream of Serialized DNS messages
59pub trait DnsClientStream:
60    Stream<Item = Result<SerialMessage, ProtoError>> + Display + Send
61{
62    /// Time implementation for this impl
63    type Time: Time;
64
65    /// The remote name server address
66    fn name_server_addr(&self) -> SocketAddr;
67}
68
69/// Receiver handle for peekable fused SerialMessage channel
70pub type StreamReceiver = Peekable<Fuse<mpsc::Receiver<SerialMessage>>>;
71
72const CHANNEL_BUFFER_SIZE: usize = 32;
73
74/// A buffering stream bound to a `SocketAddr`
75///
76/// This stream handle ensures that all messages sent via this handle have the remote_addr set as the destination for the packet
77#[derive(Clone)]
78pub struct BufDnsStreamHandle {
79    remote_addr: SocketAddr,
80    sender: mpsc::Sender<SerialMessage>,
81}
82
83impl BufDnsStreamHandle {
84    /// Constructs a new Buffered Stream Handle, used for sending data to the DNS peer.
85    ///
86    /// # Arguments
87    ///
88    /// * `remote_addr` - the address of the remote DNS system (client or server)
89    /// * `sender` - the handle being used to send data to the server
90    pub fn new(remote_addr: SocketAddr) -> (Self, StreamReceiver) {
91        let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE);
92        let receiver = receiver.fuse().peekable();
93
94        let this = Self {
95            remote_addr,
96            sender,
97        };
98
99        (this, receiver)
100    }
101
102    /// Associates a different remote address for any responses.
103    ///
104    /// This is mainly useful in server use cases where the incoming address is only known after receiving a packet.
105    pub fn with_remote_addr(&self, remote_addr: SocketAddr) -> Self {
106        Self {
107            remote_addr,
108            sender: self.sender.clone(),
109        }
110    }
111}
112
113impl DnsStreamHandle for BufDnsStreamHandle {
114    fn send(&mut self, buffer: SerialMessage) -> Result<(), ProtoError> {
115        let remote_addr: SocketAddr = self.remote_addr;
116        let sender: &mut _ = &mut self.sender;
117        sender
118            .try_send(SerialMessage::new(buffer.into_parts().0, remote_addr))
119            .map_err(|e| ProtoError::from(format!("mpsc::SendError {e}")))
120    }
121}
122
123/// Types that implement this are capable of sending a serialized DNS message on a stream
124///
125/// The underlying Stream implementation should yield `Some(())` whenever it is ready to send a message,
126///   NotReady, if it is not ready to send a message, and `Err` or `None` in the case that the stream is
127///   done, and should be shutdown.
128pub trait DnsRequestSender: Stream<Item = Result<(), ProtoError>> + Send + Unpin + 'static {
129    /// Send a message, and return a stream of response
130    ///
131    /// # Return
132    ///
133    /// A stream which will resolve to SerialMessage responses
134    fn send_message(&mut self, message: DnsRequest) -> DnsResponseStream;
135
136    /// Allows the upstream user to inform the underling stream that it should shutdown.
137    ///
138    /// After this is called, the next time `poll` is called on the stream it would be correct to return `Poll::Ready(Ok(()))`. This is not required though, if there are say outstanding requests that are not yet complete, then it would be correct to first wait for those results.
139    fn shutdown(&mut self);
140
141    /// Returns true if the stream has been shutdown with `shutdown`
142    fn is_shutdown(&self) -> bool;
143}
144
145/// Used for associating a name_server to a DnsRequestStreamHandle
146#[derive(Clone)]
147pub struct BufDnsRequestStreamHandle {
148    sender: mpsc::Sender<OneshotDnsRequest>,
149}
150
151macro_rules! try_oneshot {
152    ($expr:expr) => {{
153        use std::result::Result;
154
155        match $expr {
156            Result::Ok(val) => val,
157            Result::Err(err) => return DnsResponseReceiver::Err(Some(ProtoError::from(err))),
158        }
159    }};
160    ($expr:expr,) => {
161        $expr?
162    };
163}
164
165impl DnsHandle for BufDnsRequestStreamHandle {
166    type Response = DnsResponseReceiver;
167    type Error = ProtoError;
168
169    fn send<R: Into<DnsRequest>>(&self, request: R) -> Self::Response {
170        let request: DnsRequest = request.into();
171        debug!(
172            "enqueueing message:{}:{:?}",
173            request.op_code(),
174            request.queries()
175        );
176
177        let (request, oneshot) = OneshotDnsRequest::oneshot(request);
178        let mut sender = self.sender.clone();
179        let try_send = sender.try_send(request).map_err(|_| {
180            debug!("unable to enqueue message");
181            ProtoError::from(ProtoErrorKind::Busy)
182        });
183        try_oneshot!(try_send);
184
185        DnsResponseReceiver::Receiver(oneshot)
186    }
187}
188
189// TODO: this future should return the origin message in the response on errors
190/// A OneshotDnsRequest creates a channel for a response to message
191pub struct OneshotDnsRequest {
192    dns_request: DnsRequest,
193    sender_for_response: oneshot::Sender<DnsResponseStream>,
194}
195
196impl OneshotDnsRequest {
197    fn oneshot(dns_request: DnsRequest) -> (Self, oneshot::Receiver<DnsResponseStream>) {
198        let (sender_for_response, receiver) = oneshot::channel();
199
200        (
201            Self {
202                dns_request,
203                sender_for_response,
204            },
205            receiver,
206        )
207    }
208
209    fn into_parts(self) -> (DnsRequest, OneshotDnsResponse) {
210        (
211            self.dns_request,
212            OneshotDnsResponse(self.sender_for_response),
213        )
214    }
215}
216
217struct OneshotDnsResponse(oneshot::Sender<DnsResponseStream>);
218
219impl OneshotDnsResponse {
220    fn send_response(self, serial_response: DnsResponseStream) -> Result<(), DnsResponseStream> {
221        self.0.send(serial_response)
222    }
223}
224
225/// A Stream that wraps a oneshot::Receiver<Stream> and resolves to items in the inner Stream
226pub enum DnsResponseReceiver {
227    /// The receiver
228    Receiver(oneshot::Receiver<DnsResponseStream>),
229    /// The stream once received
230    Received(DnsResponseStream),
231    /// Error during the send operation
232    Err(Option<ProtoError>),
233}
234
235impl Stream for DnsResponseReceiver {
236    type Item = Result<DnsResponse, ProtoError>;
237
238    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
239        loop {
240            *self = match *self.as_mut() {
241                Self::Receiver(ref mut receiver) => {
242                    let receiver = Pin::new(receiver);
243                    let future = ready!(receiver
244                        .poll(cx)
245                        .map_err(|_| ProtoError::from("receiver was canceled")))?;
246                    Self::Received(future)
247                }
248                Self::Received(ref mut stream) => {
249                    return stream.poll_next_unpin(cx);
250                }
251                Self::Err(ref mut err) => return Poll::Ready(err.take().map(Err)),
252            };
253        }
254    }
255}
256
257/// Helper trait to convert a Stream of dns response into a Future
258pub trait FirstAnswer<T, E: From<ProtoError>>: Stream<Item = Result<T, E>> + Unpin + Sized {
259    /// Convert a Stream of dns response into a Future yielding the first answer,
260    /// discarding others if any.
261    fn first_answer(self) -> FirstAnswerFuture<Self> {
262        FirstAnswerFuture { stream: Some(self) }
263    }
264}
265
266impl<E, S, T> FirstAnswer<T, E> for S
267where
268    S: Stream<Item = Result<T, E>> + Unpin + Sized,
269    E: From<ProtoError>,
270{
271}
272
273/// See [FirstAnswer::first_answer]
274#[derive(Debug)]
275#[must_use = "futures do nothing unless you `.await` or poll them"]
276pub struct FirstAnswerFuture<S> {
277    stream: Option<S>,
278}
279
280impl<E, S: Stream<Item = Result<T, E>> + Unpin, T> Future for FirstAnswerFuture<S>
281where
282    S: Stream<Item = Result<T, E>> + Unpin + Sized,
283    E: From<ProtoError>,
284{
285    type Output = S::Item;
286
287    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
288        let s = self
289            .stream
290            .as_mut()
291            .expect("polling FirstAnswerFuture twice");
292        let item = match ready!(s.poll_next_unpin(cx)) {
293            Some(r) => r,
294            None => Err(ProtoError::from(ProtoErrorKind::Timeout).into()),
295        };
296        self.stream.take();
297        Poll::Ready(item)
298    }
299}