hickory_proto/xfer/
mod.rs1use std::fmt::{Debug, Display};
8use std::net::SocketAddr;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12use futures_channel::mpsc;
13use futures_channel::oneshot;
14use futures_util::future::Future;
15use futures_util::ready;
16use futures_util::stream::{Fuse, Peekable, Stream, StreamExt};
17use tracing::{debug, warn};
18
19use crate::error::*;
20use crate::Time;
21
22mod dns_exchange;
23pub mod dns_handle;
24pub mod dns_multiplexer;
25pub mod dns_request;
26pub mod dns_response;
27#[cfg(feature = "dnssec")]
28#[cfg_attr(docsrs, doc(cfg(feature = "dnssec")))]
29pub mod dnssec_dns_handle;
30pub mod retry_dns_handle;
31mod serial_message;
32
33pub use self::dns_exchange::{
34 DnsExchange, DnsExchangeBackground, DnsExchangeConnect, DnsExchangeSend,
35};
36pub use self::dns_handle::{DnsHandle, DnsStreamHandle};
37pub use self::dns_multiplexer::{DnsMultiplexer, DnsMultiplexerConnect};
38pub use self::dns_request::{DnsRequest, DnsRequestOptions};
39pub use self::dns_response::{DnsResponse, DnsResponseStream};
40#[cfg(feature = "dnssec")]
41#[cfg_attr(docsrs, doc(cfg(feature = "dnssec")))]
42pub use self::dnssec_dns_handle::DnssecDnsHandle;
43pub use self::retry_dns_handle::RetryDnsHandle;
44pub use self::serial_message::SerialMessage;
45
46fn ignore_send<M, T>(result: Result<M, mpsc::TrySendError<T>>) {
48 if let Err(error) = result {
49 if error.is_disconnected() {
50 debug!("ignoring send error on disconnected stream");
51 return;
52 }
53
54 warn!("error notifying wait, possible future leak: {:?}", error);
55 }
56}
57
58pub trait DnsClientStream:
60 Stream<Item = Result<SerialMessage, ProtoError>> + Display + Send
61{
62 type Time: Time;
64
65 fn name_server_addr(&self) -> SocketAddr;
67}
68
69pub type StreamReceiver = Peekable<Fuse<mpsc::Receiver<SerialMessage>>>;
71
72const CHANNEL_BUFFER_SIZE: usize = 32;
73
74#[derive(Clone)]
78pub struct BufDnsStreamHandle {
79 remote_addr: SocketAddr,
80 sender: mpsc::Sender<SerialMessage>,
81}
82
83impl BufDnsStreamHandle {
84 pub fn new(remote_addr: SocketAddr) -> (Self, StreamReceiver) {
91 let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_SIZE);
92 let receiver = receiver.fuse().peekable();
93
94 let this = Self {
95 remote_addr,
96 sender,
97 };
98
99 (this, receiver)
100 }
101
102 pub fn with_remote_addr(&self, remote_addr: SocketAddr) -> Self {
106 Self {
107 remote_addr,
108 sender: self.sender.clone(),
109 }
110 }
111}
112
113impl DnsStreamHandle for BufDnsStreamHandle {
114 fn send(&mut self, buffer: SerialMessage) -> Result<(), ProtoError> {
115 let remote_addr: SocketAddr = self.remote_addr;
116 let sender: &mut _ = &mut self.sender;
117 sender
118 .try_send(SerialMessage::new(buffer.into_parts().0, remote_addr))
119 .map_err(|e| ProtoError::from(format!("mpsc::SendError {e}")))
120 }
121}
122
123pub trait DnsRequestSender: Stream<Item = Result<(), ProtoError>> + Send + Unpin + 'static {
129 fn send_message(&mut self, message: DnsRequest) -> DnsResponseStream;
135
136 fn shutdown(&mut self);
140
141 fn is_shutdown(&self) -> bool;
143}
144
145#[derive(Clone)]
147pub struct BufDnsRequestStreamHandle {
148 sender: mpsc::Sender<OneshotDnsRequest>,
149}
150
151macro_rules! try_oneshot {
152 ($expr:expr) => {{
153 use std::result::Result;
154
155 match $expr {
156 Result::Ok(val) => val,
157 Result::Err(err) => return DnsResponseReceiver::Err(Some(ProtoError::from(err))),
158 }
159 }};
160 ($expr:expr,) => {
161 $expr?
162 };
163}
164
165impl DnsHandle for BufDnsRequestStreamHandle {
166 type Response = DnsResponseReceiver;
167 type Error = ProtoError;
168
169 fn send<R: Into<DnsRequest>>(&self, request: R) -> Self::Response {
170 let request: DnsRequest = request.into();
171 debug!(
172 "enqueueing message:{}:{:?}",
173 request.op_code(),
174 request.queries()
175 );
176
177 let (request, oneshot) = OneshotDnsRequest::oneshot(request);
178 let mut sender = self.sender.clone();
179 let try_send = sender.try_send(request).map_err(|_| {
180 debug!("unable to enqueue message");
181 ProtoError::from(ProtoErrorKind::Busy)
182 });
183 try_oneshot!(try_send);
184
185 DnsResponseReceiver::Receiver(oneshot)
186 }
187}
188
189pub struct OneshotDnsRequest {
192 dns_request: DnsRequest,
193 sender_for_response: oneshot::Sender<DnsResponseStream>,
194}
195
196impl OneshotDnsRequest {
197 fn oneshot(dns_request: DnsRequest) -> (Self, oneshot::Receiver<DnsResponseStream>) {
198 let (sender_for_response, receiver) = oneshot::channel();
199
200 (
201 Self {
202 dns_request,
203 sender_for_response,
204 },
205 receiver,
206 )
207 }
208
209 fn into_parts(self) -> (DnsRequest, OneshotDnsResponse) {
210 (
211 self.dns_request,
212 OneshotDnsResponse(self.sender_for_response),
213 )
214 }
215}
216
217struct OneshotDnsResponse(oneshot::Sender<DnsResponseStream>);
218
219impl OneshotDnsResponse {
220 fn send_response(self, serial_response: DnsResponseStream) -> Result<(), DnsResponseStream> {
221 self.0.send(serial_response)
222 }
223}
224
225pub enum DnsResponseReceiver {
227 Receiver(oneshot::Receiver<DnsResponseStream>),
229 Received(DnsResponseStream),
231 Err(Option<ProtoError>),
233}
234
235impl Stream for DnsResponseReceiver {
236 type Item = Result<DnsResponse, ProtoError>;
237
238 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
239 loop {
240 *self = match *self.as_mut() {
241 Self::Receiver(ref mut receiver) => {
242 let receiver = Pin::new(receiver);
243 let future = ready!(receiver
244 .poll(cx)
245 .map_err(|_| ProtoError::from("receiver was canceled")))?;
246 Self::Received(future)
247 }
248 Self::Received(ref mut stream) => {
249 return stream.poll_next_unpin(cx);
250 }
251 Self::Err(ref mut err) => return Poll::Ready(err.take().map(Err)),
252 };
253 }
254 }
255}
256
257pub trait FirstAnswer<T, E: From<ProtoError>>: Stream<Item = Result<T, E>> + Unpin + Sized {
259 fn first_answer(self) -> FirstAnswerFuture<Self> {
262 FirstAnswerFuture { stream: Some(self) }
263 }
264}
265
266impl<E, S, T> FirstAnswer<T, E> for S
267where
268 S: Stream<Item = Result<T, E>> + Unpin + Sized,
269 E: From<ProtoError>,
270{
271}
272
273#[derive(Debug)]
275#[must_use = "futures do nothing unless you `.await` or poll them"]
276pub struct FirstAnswerFuture<S> {
277 stream: Option<S>,
278}
279
280impl<E, S: Stream<Item = Result<T, E>> + Unpin, T> Future for FirstAnswerFuture<S>
281where
282 S: Stream<Item = Result<T, E>> + Unpin + Sized,
283 E: From<ProtoError>,
284{
285 type Output = S::Item;
286
287 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
288 let s = self
289 .stream
290 .as_mut()
291 .expect("polling FirstAnswerFuture twice");
292 let item = match ready!(s.poll_next_unpin(cx)) {
293 Some(r) => r,
294 None => Err(ProtoError::from(ProtoErrorKind::Timeout).into()),
295 };
296 self.stream.take();
297 Poll::Ready(item)
298 }
299}