1#[cfg(feature = "tokio")]
4mod tokio;
5
6use std::{
7 future::Future,
8 io::{Error, ErrorKind},
9 mem,
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use futures_util::ready;
15
16#[cfg(feature = "futures-io")]
17mod compat;
18#[cfg(feature = "futures-io")]
19pub use compat::Compat;
20
21pub trait AsyncSocket {
28 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, Error>>;
29
30 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>>;
31}
32
33pub(crate) trait AsyncSocketExt {
34 fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
35 where Self: Sized;
36
37 fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
38 where Self: Sized;
39}
40
41impl<S: AsyncSocket> AsyncSocketExt for S {
42 fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
43 where Self: Sized {
44 let capacity = buf.len();
45 ReadExact {
46 reader: self,
47 buf,
48 capacity,
49 }
50 }
51
52 fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
53 where Self: Sized {
54 WriteAll { writer: self, buf }
55 }
56}
57
58pub(crate) struct ReadExact<'a, R> {
59 reader: &'a mut R,
60 buf: &'a mut [u8],
61 capacity: usize,
62}
63
64impl<R: AsyncSocket + Unpin> Future for ReadExact<'_, R> {
65 type Output = Result<usize, Error>;
66
67 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68 let this = &mut *self;
69 while !this.buf.is_empty() {
70 let n = ready!(Pin::new(&mut *this.reader).poll_read(cx, this.buf))?;
71 {
72 let (_, rest) = mem::take(&mut this.buf).split_at_mut(n);
73 this.buf = rest;
74 }
75 if n == 0 {
76 return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
77 }
78 }
79 Poll::Ready(Ok(this.capacity))
80 }
81}
82
83pub(crate) struct WriteAll<'a, W> {
84 writer: &'a mut W,
85 buf: &'a [u8],
86}
87
88impl<W: AsyncSocket + Unpin> Future for WriteAll<'_, W> {
89 type Output = Result<(), Error>;
90
91 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92 let this = &mut *self;
93 while !this.buf.is_empty() {
94 let n = ready!(Pin::new(&mut *this.writer).poll_write(cx, this.buf))?;
95 {
96 let (_, rest) = mem::take(&mut this.buf).split_at(n);
97 this.buf = rest;
98 }
99 if n == 0 {
100 return Poll::Ready(Err(ErrorKind::WriteZero.into()));
101 }
102 }
103
104 Poll::Ready(Ok(()))
105 }
106}