tokio_socks/io/
mod.rs

1//! Asynchronous I/O abstractions for sockets.
2
3#[cfg(feature = "tokio")]
4mod tokio;
5
6use std::{
7    future::Future,
8    io::{Error, ErrorKind},
9    mem,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use futures_util::ready;
15
16#[cfg(feature = "futures-io")]
17mod compat;
18#[cfg(feature = "futures-io")]
19pub use compat::Compat;
20
21/// A trait for asynchronous socket I/O.
22///
23/// Any type that implements tokio's `AsyncRead` and `AsyncWrite` traits
24/// has implemented `AsyncSocket` trait.
25///
26/// Use `FuturesIoCompatExt` to wrap `futures-io` types as `AsyncSocket` types.
27pub trait AsyncSocket {
28    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, Error>>;
29
30    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>>;
31}
32
33pub(crate) trait AsyncSocketExt {
34    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
35    where Self: Sized;
36
37    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
38    where Self: Sized;
39}
40
41impl<S: AsyncSocket> AsyncSocketExt for S {
42    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
43    where Self: Sized {
44        let capacity = buf.len();
45        ReadExact {
46            reader: self,
47            buf,
48            capacity,
49        }
50    }
51
52    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
53    where Self: Sized {
54        WriteAll { writer: self, buf }
55    }
56}
57
58pub(crate) struct ReadExact<'a, R> {
59    reader: &'a mut R,
60    buf: &'a mut [u8],
61    capacity: usize,
62}
63
64impl<R: AsyncSocket + Unpin> Future for ReadExact<'_, R> {
65    type Output = Result<usize, Error>;
66
67    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68        let this = &mut *self;
69        while !this.buf.is_empty() {
70            let n = ready!(Pin::new(&mut *this.reader).poll_read(cx, this.buf))?;
71            {
72                let (_, rest) = mem::take(&mut this.buf).split_at_mut(n);
73                this.buf = rest;
74            }
75            if n == 0 {
76                return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
77            }
78        }
79        Poll::Ready(Ok(this.capacity))
80    }
81}
82
83pub(crate) struct WriteAll<'a, W> {
84    writer: &'a mut W,
85    buf: &'a [u8],
86}
87
88impl<W: AsyncSocket + Unpin> Future for WriteAll<'_, W> {
89    type Output = Result<(), Error>;
90
91    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92        let this = &mut *self;
93        while !this.buf.is_empty() {
94            let n = ready!(Pin::new(&mut *this.writer).poll_write(cx, this.buf))?;
95            {
96                let (_, rest) = mem::take(&mut this.buf).split_at(n);
97                this.buf = rest;
98            }
99            if n == 0 {
100                return Poll::Ready(Err(ErrorKind::WriteZero.into()));
101            }
102        }
103
104        Poll::Ready(Ok(()))
105    }
106}