tokio_io/
framed.rs

1#![allow(deprecated)]
2
3use std::fmt;
4use std::io::{self, Read, Write};
5
6use codec::{Decoder, Encoder};
7use framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
8use framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
9use {AsyncRead, AsyncWrite};
10
11use bytes::BytesMut;
12use futures::{Poll, Sink, StartSend, Stream};
13
14/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
15/// the `Encoder` and `Decoder` traits to encode and decode frames.
16///
17/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
18#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
19#[doc(hidden)]
20pub struct Framed<T, U> {
21    inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
22}
23
24#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
25#[doc(hidden)]
26pub struct Fuse<T, U>(pub T, pub U);
27
28pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U>
29where
30    T: AsyncRead + AsyncWrite,
31    U: Decoder + Encoder,
32{
33    Framed {
34        inner: framed_read2(framed_write2(Fuse(inner, codec))),
35    }
36}
37
38impl<T, U> Framed<T, U> {
39    /// Provides a `Stream` and `Sink` interface for reading and writing to this
40    /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
41    ///
42    /// Raw I/O objects work with byte sequences, but higher-level code usually
43    /// wants to batch these into meaningful chunks, called "frames". This
44    /// method layers framing on top of an I/O object, by using the `Codec`
45    /// traits to handle encoding and decoding of messages frames. Note that
46    /// the incoming and outgoing frame types may be distinct.
47    ///
48    /// This function returns a *single* object that is both `Stream` and
49    /// `Sink`; grouping this into a single object is often useful for layering
50    /// things like gzip or TLS, which require both read and write access to the
51    /// underlying object.
52    ///
53    /// This objects takes a stream and a readbuffer and a writebuffer. These field
54    /// can be obtained from an existing `Framed` with the `into_parts` method.
55    ///
56    /// If you want to work more directly with the streams and sink, consider
57    /// calling `split` on the `Framed` returned by this method, which will
58    /// break them into separate objects, allowing them to interact more easily.
59    pub fn from_parts(parts: FramedParts<T>, codec: U) -> Framed<T, U> {
60        Framed {
61            inner: framed_read2_with_buffer(
62                framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf),
63                parts.readbuf,
64            ),
65        }
66    }
67
68    /// Returns a reference to the underlying I/O stream wrapped by
69    /// `Frame`.
70    ///
71    /// Note that care should be taken to not tamper with the underlying stream
72    /// of data coming in as it may corrupt the stream of frames otherwise
73    /// being worked with.
74    pub fn get_ref(&self) -> &T {
75        &self.inner.get_ref().get_ref().0
76    }
77
78    /// Returns a mutable reference to the underlying I/O stream wrapped by
79    /// `Frame`.
80    ///
81    /// Note that care should be taken to not tamper with the underlying stream
82    /// of data coming in as it may corrupt the stream of frames otherwise
83    /// being worked with.
84    pub fn get_mut(&mut self) -> &mut T {
85        &mut self.inner.get_mut().get_mut().0
86    }
87
88    /// Consumes the `Frame`, returning its underlying I/O stream.
89    ///
90    /// Note that care should be taken to not tamper with the underlying stream
91    /// of data coming in as it may corrupt the stream of frames otherwise
92    /// being worked with.
93    pub fn into_inner(self) -> T {
94        self.inner.into_inner().into_inner().0
95    }
96
97    /// Consumes the `Frame`, returning its underlying I/O stream and the buffer
98    /// with unprocessed data.
99    ///
100    /// Note that care should be taken to not tamper with the underlying stream
101    /// of data coming in as it may corrupt the stream of frames otherwise
102    /// being worked with.
103    pub fn into_parts(self) -> FramedParts<T> {
104        let (inner, readbuf) = self.inner.into_parts();
105        let (inner, writebuf) = inner.into_parts();
106        FramedParts {
107            inner: inner.0,
108            readbuf: readbuf,
109            writebuf: writebuf,
110        }
111    }
112
113    /// Consumes the `Frame`, returning its underlying I/O stream and the buffer
114    /// with unprocessed data, and also the current codec state.
115    ///
116    /// Note that care should be taken to not tamper with the underlying stream
117    /// of data coming in as it may corrupt the stream of frames otherwise
118    /// being worked with.
119    ///
120    /// Note that this function will be removed once the codec has been
121    /// integrated into `FramedParts` in a new version (see
122    /// [#53](https://github.com/tokio-rs/tokio-io/pull/53)).
123    pub fn into_parts_and_codec(self) -> (FramedParts<T>, U) {
124        let (inner, readbuf) = self.inner.into_parts();
125        let (inner, writebuf) = inner.into_parts();
126        (
127            FramedParts {
128                inner: inner.0,
129                readbuf: readbuf,
130                writebuf: writebuf,
131            },
132            inner.1,
133        )
134    }
135}
136
137impl<T, U> Stream for Framed<T, U>
138where
139    T: AsyncRead,
140    U: Decoder,
141{
142    type Item = U::Item;
143    type Error = U::Error;
144
145    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
146        self.inner.poll()
147    }
148}
149
150impl<T, U> Sink for Framed<T, U>
151where
152    T: AsyncWrite,
153    U: Encoder,
154    U::Error: From<io::Error>,
155{
156    type SinkItem = U::Item;
157    type SinkError = U::Error;
158
159    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
160        self.inner.get_mut().start_send(item)
161    }
162
163    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
164        self.inner.get_mut().poll_complete()
165    }
166
167    fn close(&mut self) -> Poll<(), Self::SinkError> {
168        self.inner.get_mut().close()
169    }
170}
171
172impl<T, U> fmt::Debug for Framed<T, U>
173where
174    T: fmt::Debug,
175    U: fmt::Debug,
176{
177    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
178        f.debug_struct("Framed")
179            .field("io", &self.inner.get_ref().get_ref().0)
180            .field("codec", &self.inner.get_ref().get_ref().1)
181            .finish()
182    }
183}
184
185// ===== impl Fuse =====
186
187impl<T: Read, U> Read for Fuse<T, U> {
188    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
189        self.0.read(dst)
190    }
191}
192
193impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
194    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
195        self.0.prepare_uninitialized_buffer(buf)
196    }
197}
198
199impl<T: Write, U> Write for Fuse<T, U> {
200    fn write(&mut self, src: &[u8]) -> io::Result<usize> {
201        self.0.write(src)
202    }
203
204    fn flush(&mut self) -> io::Result<()> {
205        self.0.flush()
206    }
207}
208
209impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
210    fn shutdown(&mut self) -> Poll<(), io::Error> {
211        self.0.shutdown()
212    }
213}
214
215impl<T, U: Decoder> Decoder for Fuse<T, U> {
216    type Item = U::Item;
217    type Error = U::Error;
218
219    fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
220        self.1.decode(buffer)
221    }
222
223    fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
224        self.1.decode_eof(buffer)
225    }
226}
227
228impl<T, U: Encoder> Encoder for Fuse<T, U> {
229    type Item = U::Item;
230    type Error = U::Error;
231
232    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
233        self.1.encode(item, dst)
234    }
235}
236
237/// `FramedParts` contains an export of the data of a Framed transport.
238/// It can be used to construct a new `Framed` with a different codec.
239/// It contains all current buffers and the inner transport.
240#[derive(Debug)]
241pub struct FramedParts<T> {
242    /// The inner transport used to read bytes to and write bytes to
243    pub inner: T,
244    /// The buffer with read but unprocessed data.
245    pub readbuf: BytesMut,
246    /// A buffer with unprocessed data which are not written yet.
247    pub writebuf: BytesMut,
248}