broker_tokio/net/unix/
stream.rs1use crate::future::poll_fn;
2use crate::io::{AsyncRead, AsyncWrite, PollEvented};
3use crate::net::unix::split::{split, ReadHalf, WriteHalf};
4use crate::net::unix::ucred::{self, UCred};
5
6use std::convert::TryFrom;
7use std::fmt;
8use std::io::{self, Read, Write};
9use std::mem::MaybeUninit;
10use std::net::Shutdown;
11use std::os::unix::io::{AsRawFd, RawFd};
12use std::os::unix::net::{self, SocketAddr};
13use std::path::Path;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17cfg_uds! {
18 pub struct UnixStream {
24 io: PollEvented<mio_uds::UnixStream>,
25 }
26}
27
28impl UnixStream {
29 pub async fn connect<P>(path: P) -> io::Result<UnixStream>
35 where
36 P: AsRef<Path>,
37 {
38 let stream = mio_uds::UnixStream::connect(path)?;
39 let stream = UnixStream::new(stream)?;
40
41 poll_fn(|cx| stream.io.poll_write_ready(cx)).await?;
42 Ok(stream)
43 }
44
45 pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
59 let stream = mio_uds::UnixStream::from_stream(stream)?;
60 let io = PollEvented::new(stream)?;
61
62 Ok(UnixStream { io })
63 }
64
65 pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
71 let (a, b) = mio_uds::UnixStream::pair()?;
72 let a = UnixStream::new(a)?;
73 let b = UnixStream::new(b)?;
74
75 Ok((a, b))
76 }
77
78 pub(crate) fn new(stream: mio_uds::UnixStream) -> io::Result<UnixStream> {
79 let io = PollEvented::new(stream)?;
80 Ok(UnixStream { io })
81 }
82
83 pub fn local_addr(&self) -> io::Result<SocketAddr> {
85 self.io.get_ref().local_addr()
86 }
87
88 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
90 self.io.get_ref().peer_addr()
91 }
92
93 pub fn peer_cred(&self) -> io::Result<UCred> {
95 ucred::get_peer_cred(self)
96 }
97
98 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
100 self.io.get_ref().take_error()
101 }
102
103 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
109 self.io.get_ref().shutdown(how)
110 }
111
112 pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) {
118 split(self)
119 }
120}
121
122impl TryFrom<UnixStream> for mio_uds::UnixStream {
123 type Error = io::Error;
124
125 fn try_from(value: UnixStream) -> Result<Self, Self::Error> {
132 value.io.into_inner()
133 }
134}
135
136impl TryFrom<net::UnixStream> for UnixStream {
137 type Error = io::Error;
138
139 fn try_from(stream: net::UnixStream) -> io::Result<Self> {
144 Self::from_std(stream)
145 }
146}
147
148impl AsyncRead for UnixStream {
149 unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
150 false
151 }
152
153 fn poll_read(
154 self: Pin<&mut Self>,
155 cx: &mut Context<'_>,
156 buf: &mut [u8],
157 ) -> Poll<io::Result<usize>> {
158 self.poll_read_priv(cx, buf)
159 }
160}
161
162impl AsyncWrite for UnixStream {
163 fn poll_write(
164 self: Pin<&mut Self>,
165 cx: &mut Context<'_>,
166 buf: &[u8],
167 ) -> Poll<io::Result<usize>> {
168 self.poll_write_priv(cx, buf)
169 }
170
171 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
172 Poll::Ready(Ok(()))
173 }
174
175 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
176 Poll::Ready(Ok(()))
177 }
178}
179
180impl UnixStream {
181 pub(crate) fn poll_read_priv(
193 &self,
194 cx: &mut Context<'_>,
195 buf: &mut [u8],
196 ) -> Poll<io::Result<usize>> {
197 ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
198
199 match self.io.get_ref().read(buf) {
200 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
201 self.io.clear_read_ready(cx, mio::Ready::readable())?;
202 Poll::Pending
203 }
204 x => Poll::Ready(x),
205 }
206 }
207
208 pub(crate) fn poll_write_priv(
209 &self,
210 cx: &mut Context<'_>,
211 buf: &[u8],
212 ) -> Poll<io::Result<usize>> {
213 ready!(self.io.poll_write_ready(cx))?;
214
215 match self.io.get_ref().write(buf) {
216 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
217 self.io.clear_write_ready(cx)?;
218 Poll::Pending
219 }
220 x => Poll::Ready(x),
221 }
222 }
223}
224
225impl fmt::Debug for UnixStream {
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 self.io.get_ref().fmt(f)
228 }
229}
230
231impl AsRawFd for UnixStream {
232 fn as_raw_fd(&self) -> RawFd {
233 self.io.get_ref().as_raw_fd()
234 }
235}