tokio_io/
lines.rs

1use std::io::{self, BufRead};
2use std::mem;
3
4use futures::{Poll, Stream};
5
6use AsyncRead;
7
8/// Combinator created by the top-level `lines` method which is a stream over
9/// the lines of text on an I/O object.
10#[derive(Debug)]
11pub struct Lines<A> {
12    io: A,
13    line: String,
14}
15
16/// Creates a new stream from the I/O object given representing the lines of
17/// input that are found on `A`.
18///
19/// This method takes an asynchronous I/O object, `a`, and returns a `Stream` of
20/// lines that the object contains. The returned stream will reach its end once
21/// `a` reaches EOF.
22pub fn lines<A>(a: A) -> Lines<A>
23where
24    A: AsyncRead + BufRead,
25{
26    Lines {
27        io: a,
28        line: String::new(),
29    }
30}
31
32impl<A> Lines<A> {
33    /// Returns the underlying I/O object.
34    ///
35    /// Note that this may lose data already read into internal buffers. It's
36    /// recommended to only call this once the stream has reached its end.
37    pub fn into_inner(self) -> A {
38        self.io
39    }
40}
41
42impl<A> Stream for Lines<A>
43where
44    A: AsyncRead + BufRead,
45{
46    type Item = String;
47    type Error = io::Error;
48
49    fn poll(&mut self) -> Poll<Option<String>, io::Error> {
50        let n = try_nb!(self.io.read_line(&mut self.line));
51        if n == 0 && self.line.len() == 0 {
52            return Ok(None.into());
53        }
54        if self.line.ends_with("\n") {
55            self.line.pop();
56            if self.line.ends_with("\r") {
57                self.line.pop();
58            }
59        }
60        Ok(Some(mem::replace(&mut self.line, String::new())).into())
61    }
62}