websocket/server/upgrade/
sync.rs

1//! Allows you to take an existing request or stream of data and convert it into a
2//! WebSocket client.
3use crate::client::sync::Client;
4use crate::server::upgrade::{validate, HyperIntoWsError, Request, WsUpgrade};
5use crate::stream::sync::{AsTcpStream, Stream};
6use std::io;
7use std::net::TcpStream;
8
9use hyper::buffer::BufReader;
10use hyper::header::Headers;
11use hyper::http::h1::parse_request;
12use hyper::http::h1::Incoming;
13use hyper::net::NetworkStream;
14use hyper::status::StatusCode;
15
16const DEFAULT_MAX_DATAFRAME_SIZE : usize = 1024*1024*100;
17const DEFAULT_MAX_MESSAGE_SIZE : usize = 1024*1024*200;
18
19/// This crate uses buffered readers to read in the handshake quickly, in order to
20/// interface with other use cases that don't use buffered readers the buffered readers
21/// is deconstructed when it is returned to the user and given as the underlying
22/// reader and the buffer.
23///
24/// This struct represents bytes that have already been read in from the stream.
25/// A slice of valid data in this buffer can be obtained by: `&buf[pos..cap]`.
26#[derive(Debug)]
27pub struct Buffer {
28	/// the contents of the buffered stream data
29	pub buf: Vec<u8>,
30	/// the current position of cursor in the buffer
31	/// Any data before `pos` has already been read and parsed.
32	pub pos: usize,
33	/// the last location of valid data
34	/// Any data after `cap` is not valid.
35	pub cap: usize,
36}
37
38/// If you have your requests separate from your stream you can use this struct
39/// to upgrade the connection based on the request given
40/// (the request should be a handshake).
41pub struct RequestStreamPair<S: Stream>(pub S, pub Request);
42
43/// The synchronous specialization of `WsUpgrade`.
44/// See the `WsUpgrade` docs for usage and the extra synchronous methods
45/// given by this specialization.
46pub type Upgrade<S> = WsUpgrade<S, Option<Buffer>>;
47
48/// These methods are the synchronous ways of accepting and rejecting a websocket
49/// handshake.
50impl<S> WsUpgrade<S, Option<Buffer>>
51where
52	S: Stream,
53{
54	/// Accept the handshake request and send a response,
55	/// if nothing goes wrong a client will be created.
56	pub fn accept(self) -> Result<Client<S>, (S, io::Error)> {
57		self.internal_accept(None)
58	}
59
60	/// Accept the handshake request and send a response while
61	/// adding on a few headers. These headers are added before the required
62	/// headers are, so some might be overwritten.
63	pub fn accept_with(self, custom_headers: &Headers) -> Result<Client<S>, (S, io::Error)> {
64		self.internal_accept(Some(custom_headers))
65	}
66
67	/// Accept the handshake request and send a response,
68	/// if nothing goes wrong a client will be created.
69	pub fn accept_with_limits(self, max_dataframe_size: usize, max_message_size: usize) -> Result<Client<S>, (S, io::Error)> {
70		self.internal_accept_with_limits(None, max_dataframe_size, max_message_size)
71	}
72
73	/// Accept the handshake request and send a response while
74	/// adding on a few headers. These headers are added before the required
75	/// headers are, so some might be overwritten.
76	pub fn accept_with_headers_and_limits(self, custom_headers: &Headers, max_dataframe_size: usize, max_message_size: usize) -> Result<Client<S>, (S, io::Error)> {
77		self.internal_accept_with_limits(Some(custom_headers), max_dataframe_size, max_message_size)
78	}
79
80	fn internal_accept(self, headers: Option<&Headers>) -> Result<Client<S>, (S, io::Error)> {
81		self.internal_accept_with_limits(headers, DEFAULT_MAX_DATAFRAME_SIZE, DEFAULT_MAX_MESSAGE_SIZE)
82	}
83
84	fn internal_accept_with_limits(mut self, headers: Option<&Headers>, max_dataframe_size: usize, max_message_size: usize) -> Result<Client<S>, (S, io::Error)> {
85		let status = self.prepare_headers(headers);
86
87		if let Err(e) = self.send(status) {
88			return Err((self.stream, e));
89		}
90
91		let stream = match self.buffer {
92			Some(Buffer { buf, pos, cap }) => BufReader::from_parts(self.stream, buf, pos, cap),
93			None => BufReader::new(self.stream),
94		};
95
96		Ok(Client::unchecked_with_limits(stream, self.headers, false, true, max_dataframe_size, max_message_size))
97	}
98
99	/// Reject the client's request to make a websocket connection.
100	pub fn reject(self) -> Result<S, (S, io::Error)> {
101		self.internal_reject(None)
102	}
103
104	/// Reject the client's request to make a websocket connection
105	/// and send extra headers.
106	pub fn reject_with(self, headers: &Headers) -> Result<S, (S, io::Error)> {
107		self.internal_reject(Some(headers))
108	}
109
110	fn internal_reject(mut self, headers: Option<&Headers>) -> Result<S, (S, io::Error)> {
111		if let Some(custom) = headers {
112			self.headers.extend(custom.iter());
113		}
114		match self.send(StatusCode::BadRequest) {
115			Ok(()) => Ok(self.stream),
116			Err(e) => Err((self.stream, e)),
117		}
118	}
119}
120
121impl<S, B> WsUpgrade<S, B>
122where
123	S: Stream + AsTcpStream,
124{
125	/// Get a handle to the underlying TCP stream, useful to be able to set
126	/// TCP options, etc.
127	pub fn tcp_stream(&self) -> &TcpStream {
128		self.stream.as_tcp()
129	}
130}
131
132/// Trait to take a stream or similar and attempt to recover the start of a
133/// websocket handshake from it.
134/// Should be used when a stream might contain a request for a websocket session.
135///
136/// If an upgrade request can be parsed, one can accept or deny the handshake with
137/// the `WsUpgrade` struct.
138/// Otherwise the original stream is returned along with an error.
139///
140/// Note: the stream is owned because the websocket client expects to own its stream.
141///
142/// This is already implemented for all Streams, which means all types with Read + Write.
143///
144/// # Example
145///
146/// ```rust,no_run
147/// use std::net::TcpListener;
148/// use std::net::TcpStream;
149/// use websocket::sync::server::upgrade::IntoWs;
150/// use websocket::sync::Client;
151///
152/// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
153///
154/// for stream in listener.incoming().filter_map(Result::ok) {
155///     let mut client: Client<TcpStream> = match stream.into_ws() {
156/// 		    Ok(upgrade) => {
157///             match upgrade.accept() {
158///                 Ok(client) => client,
159///                 Err(_) => panic!(),
160///             }
161///         },
162/// 		    Err(_) => panic!(),
163///     };
164/// }
165/// ```
166pub trait IntoWs {
167	/// The type of stream this upgrade process is working with (TcpStream, etc.)
168	type Stream: Stream;
169	/// An error value in case the stream is not asking for a websocket connection
170	/// or something went wrong. It is common to also include the stream here.
171	type Error;
172	/// Attempt to parse the start of a Websocket handshake, later with the  returned
173	/// `WsUpgrade` struct, call `accept` to start a websocket client, and `reject` to
174	/// send a handshake rejection response.
175	fn into_ws(self) -> Result<Upgrade<Self::Stream>, Self::Error>;
176}
177
178impl<S> IntoWs for S
179where
180	S: Stream,
181{
182	type Stream = S;
183	type Error = (S, Option<Request>, Option<Buffer>, HyperIntoWsError);
184
185	fn into_ws(self) -> Result<Upgrade<Self::Stream>, Self::Error> {
186		let mut reader = BufReader::new(self);
187		let request = parse_request(&mut reader);
188
189		let (stream, buf, pos, cap) = reader.into_parts();
190		let buffer = Some(Buffer { buf, cap, pos });
191
192		let request = match request {
193			Ok(r) => r,
194			Err(e) => return Err((stream, None, buffer, e.into())),
195		};
196
197		match validate(&request.subject.0, request.version, &request.headers) {
198			Ok(_) => Ok(WsUpgrade {
199				headers: Headers::new(),
200				stream,
201				request,
202				buffer,
203			}),
204			Err(e) => Err((stream, Some(request), buffer, e)),
205		}
206	}
207}
208
209impl<S> IntoWs for RequestStreamPair<S>
210where
211	S: Stream,
212{
213	type Stream = S;
214	type Error = (S, Request, HyperIntoWsError);
215
216	fn into_ws(self) -> Result<Upgrade<Self::Stream>, Self::Error> {
217		match validate(&self.1.subject.0, self.1.version, &self.1.headers) {
218			Ok(_) => Ok(WsUpgrade {
219				headers: Headers::new(),
220				stream: self.0,
221				request: self.1,
222				buffer: None,
223			}),
224			Err(e) => Err((self.0, self.1, e)),
225		}
226	}
227}
228
229/// Upgrade a hyper connection to a websocket one.
230///
231/// A hyper request is implicitly defined as a stream from other `impl`s of Stream.
232/// Until trait impl specialization comes along, we use this struct to differentiate
233/// a hyper request (which already has parsed headers) from a normal stream.
234///
235/// Using this method, one can start a hyper server and check if each request
236/// is a websocket upgrade request, if so you can use websockets and hyper on the
237/// same port!
238///
239/// ```rust,no_run
240/// # extern crate hyper;
241/// # extern crate websocket;
242/// # fn main() {
243/// use hyper::server::{Server, Request, Response};
244/// use websocket::Message;
245/// use websocket::sync::server::upgrade::IntoWs;
246/// use websocket::sync::server::upgrade::HyperRequest;
247///
248/// Server::http("0.0.0.0:80").unwrap().handle(move |req: Request, res: Response| {
249///     match HyperRequest(req).into_ws() {
250///         Ok(upgrade) => {
251///             // `accept` sends a successful handshake, no need to worry about res
252///             let mut client = match upgrade.accept() {
253///                 Ok(c) => c,
254///                 Err(_) => panic!(),
255///             };
256///
257///             client.send_message(&Message::text("its free real estate"));
258///         },
259///
260///         Err((request, err)) => {
261///             // continue using the request as normal, "echo uri"
262///             res.send(b"Try connecting over ws instead.").unwrap();
263///         },
264///     };
265/// })
266/// .unwrap();
267/// # }
268/// ```
269pub struct HyperRequest<'a, 'b: 'a>(pub ::hyper::server::Request<'a, 'b>);
270
271impl<'a, 'b> IntoWs for HyperRequest<'a, 'b> {
272	type Stream = &'a mut &'b mut dyn NetworkStream;
273	type Error = (::hyper::server::Request<'a, 'b>, HyperIntoWsError);
274
275	fn into_ws(self) -> Result<Upgrade<Self::Stream>, Self::Error> {
276		if let Err(e) = validate(&self.0.method, self.0.version, &self.0.headers) {
277			return Err((self.0, e));
278		}
279
280		let (_, method, headers, uri, version, reader) = self.0.deconstruct();
281
282		let reader = reader.into_inner();
283		let (buf, pos, cap) = reader.take_buf();
284		let stream = reader.get_mut();
285
286		Ok(Upgrade {
287			headers: Headers::new(),
288			stream,
289			buffer: Some(Buffer { buf, pos, cap }),
290			request: Incoming {
291				version,
292				headers,
293				subject: (method, uri),
294			},
295		})
296	}
297}