1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
//! Integration of TUN/TAP into tokio.
//!
//! See the [`Async`](struct.Async.html) structure.
extern crate futures;
extern crate libc;
extern crate mio;
extern crate tokio_core;

use std::io::{Error, ErrorKind, Read, Result, Write};
use std::os::unix::io::AsRawFd;

use self::futures::{Async as FAsync, AsyncSink, Sink, StartSend, Stream, Poll as FPoll};
use self::mio::{Evented, Poll as MPoll, PollOpt, Ready, Token};
use self::mio::unix::EventedFd;
use self::tokio_core::reactor::{Handle, PollEvented};

use super::Iface;

struct MioWrapper {
    iface: Iface,
}

impl Evented for MioWrapper {
    fn register(&self, poll: &MPoll, token: Token, events: Ready, opts: PollOpt) -> Result<()> {
        EventedFd(&self.iface.as_raw_fd()).register(poll, token, events, opts)
    }
    fn reregister(&self, poll: &MPoll, token: Token, events: Ready, opts: PollOpt) -> Result<()> {
        EventedFd(&self.iface.as_raw_fd()).reregister(poll, token, events, opts)
    }
    fn deregister(&self, poll: &MPoll) -> Result<()> {
        EventedFd(&self.iface.as_raw_fd()).deregister(poll)
    }
}

impl Read for MioWrapper {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
        self.iface.recv(buf)
    }
}

impl Write for MioWrapper {
    fn write(&mut self, buf: &[u8]) -> Result<usize> {
        self.iface.send(buf)
    }
    fn flush(&mut self) -> Result<()> {
        Ok(())
    }
}

/// A wrapper around [`Iface`](../struct.Iface.html) for use in connection with tokio.
///
/// This turns the synchronous `Iface` into an asynchronous `Sink + Stream` of packets.
pub struct Async {
    mio: PollEvented<MioWrapper>,
    recv_bufsize: usize,
}

impl Async {
    /// Consumes an `Iface` and wraps it in a new `Async`.
    ///
    /// # Parameters
    ///
    /// * `iface`: The created interface to wrap. It gets consumed.
    /// * `handle`: The handle to tokio's `Core` to run on.
    ///
    /// # Errors
    ///
    /// This fails with an error in case of low-level OS errors (they shouldn't usually happen).
    ///
    /// # Examples
    ///
    /// ```rust,no_run
    /// # extern crate futures;
    /// # extern crate tokio_core;
    /// # extern crate tun_tap;
    /// # use futures::Stream;
    /// # use tun_tap::*;
    /// # use tun_tap::async::*;
    /// # use tokio_core::reactor::Core;
    /// # fn main() {
    /// let iface = Iface::new("mytun%d", Mode::Tun).unwrap();
    /// let name = iface.name().to_owned();
    /// // Bring the interface up by `ip addr add IP dev $name; ip link set up dev $name`
    /// let core = Core::new().unwrap();
    /// let async = Async::new(iface, &core.handle()).unwrap();
    /// let (sink, stream) = async.split();
    /// # }
    /// ```
    pub fn new(iface: Iface, handle: &Handle) -> Result<Self> {
        iface.set_non_blocking()?;
        Ok(Async {
            mio: PollEvented::new(MioWrapper { iface }, handle)?,
            recv_bufsize: 1542,
        })
    }
    /// Sets the receive buffer size.
    ///
    /// When receiving a packet, a buffer of this size is allocated and the packet read into it.
    /// This configures the size of the buffer.
    ///
    /// This needs to be called when the interface's MTU is changed from the default 1500. The
    /// default should be enough otherwise.
    pub fn set_recv_bufsize(&mut self, bufsize: usize) {
        self.recv_bufsize = bufsize;
    }
}

impl Stream for Async {
    type Item = Vec<u8>;
    type Error = Error;
    fn poll(&mut self) -> FPoll<Option<Self::Item>, Self::Error> {
        // TODO Reuse buffer?
        let mut buffer = vec![0; self.recv_bufsize];
        match self.mio.read(&mut buffer) {
            Ok(size) => {
                buffer.resize(size, 0);
                Ok(FAsync::Ready(Some(buffer)))
            },
            Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(FAsync::NotReady),
            Err(e) => Err(e),
        }
    }
}

impl Sink for Async {
    type SinkItem = Vec<u8>;
    type SinkError = Error;
    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
        match self.mio.write(&item) {
            // TODO What to do about short write? Can it happen?
            Ok(_size) => Ok(AsyncSink::Ready),
            Err(ref e) if e.kind() == ErrorKind::WouldBlock => Ok(AsyncSink::NotReady(item)),
            Err(e) => Err(e),
        }
    }
    fn poll_complete(&mut self) -> FPoll<(), Self::SinkError> {
        Ok(FAsync::Ready(()))
    }
}