1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
//! Allows you to take an existing request or stream of data and convert it into a
//! WebSocket client.
use crate::client::sync::Client;
use crate::server::upgrade::{validate, HyperIntoWsError, Request, WsUpgrade};
use crate::stream::sync::{AsTcpStream, Stream};
use std::io;
use std::net::TcpStream;
use hyper::buffer::BufReader;
use hyper::header::Headers;
use hyper::http::h1::parse_request;
use hyper::http::h1::Incoming;
use hyper::net::NetworkStream;
use hyper::status::StatusCode;
const DEFAULT_MAX_DATAFRAME_SIZE : usize = 1024*1024*100;
const DEFAULT_MAX_MESSAGE_SIZE : usize = 1024*1024*200;
/// This crate uses buffered readers to read in the handshake quickly, in order to
/// interface with other use cases that don't use buffered readers the buffered readers
/// is deconstructed when it is returned to the user and given as the underlying
/// reader and the buffer.
///
/// This struct represents bytes that have already been read in from the stream.
/// A slice of valid data in this buffer can be obtained by: `&buf[pos..cap]`.
#[derive(Debug)]
pub struct Buffer {
/// the contents of the buffered stream data
pub buf: Vec<u8>,
/// the current position of cursor in the buffer
/// Any data before `pos` has already been read and parsed.
pub pos: usize,
/// the last location of valid data
/// Any data after `cap` is not valid.
pub cap: usize,
}
/// If you have your requests separate from your stream you can use this struct
/// to upgrade the connection based on the request given
/// (the request should be a handshake).
pub struct RequestStreamPair<S: Stream>(pub S, pub Request);
/// The synchronous specialization of `WsUpgrade`.
/// See the `WsUpgrade` docs for usage and the extra synchronous methods
/// given by this specialization.
pub type Upgrade<S> = WsUpgrade<S, Option<Buffer>>;
/// These methods are the synchronous ways of accepting and rejecting a websocket
/// handshake.
impl<S> WsUpgrade<S, Option<Buffer>>
where
S: Stream,
{
/// Accept the handshake request and send a response,
/// if nothing goes wrong a client will be created.
pub fn accept(self) -> Result<Client<S>, (S, io::Error)> {
self.internal_accept(None)
}
/// Accept the handshake request and send a response while
/// adding on a few headers. These headers are added before the required
/// headers are, so some might be overwritten.
pub fn accept_with(self, custom_headers: &Headers) -> Result<Client<S>, (S, io::Error)> {
self.internal_accept(Some(custom_headers))
}
/// Accept the handshake request and send a response,
/// if nothing goes wrong a client will be created.
pub fn accept_with_limits(self, max_dataframe_size: usize, max_message_size: usize) -> Result<Client<S>, (S, io::Error)> {
self.internal_accept_with_limits(None, max_dataframe_size, max_message_size)
}
/// Accept the handshake request and send a response while
/// adding on a few headers. These headers are added before the required
/// headers are, so some might be overwritten.
pub fn accept_with_headers_and_limits(self, custom_headers: &Headers, max_dataframe_size: usize, max_message_size: usize) -> Result<Client<S>, (S, io::Error)> {
self.internal_accept_with_limits(Some(custom_headers), max_dataframe_size, max_message_size)
}
fn internal_accept(self, headers: Option<&Headers>) -> Result<Client<S>, (S, io::Error)> {
self.internal_accept_with_limits(headers, DEFAULT_MAX_DATAFRAME_SIZE, DEFAULT_MAX_MESSAGE_SIZE)
}
fn internal_accept_with_limits(mut self, headers: Option<&Headers>, max_dataframe_size: usize, max_message_size: usize) -> Result<Client<S>, (S, io::Error)> {
let status = self.prepare_headers(headers);
if let Err(e) = self.send(status) {
return Err((self.stream, e));
}
let stream = match self.buffer {
Some(Buffer { buf, pos, cap }) => BufReader::from_parts(self.stream, buf, pos, cap),
None => BufReader::new(self.stream),
};
Ok(Client::unchecked_with_limits(stream, self.headers, false, true, max_dataframe_size, max_message_size))
}
/// Reject the client's request to make a websocket connection.
pub fn reject(self) -> Result<S, (S, io::Error)> {
self.internal_reject(None)
}
/// Reject the client's request to make a websocket connection
/// and send extra headers.
pub fn reject_with(self, headers: &Headers) -> Result<S, (S, io::Error)> {
self.internal_reject(Some(headers))
}
fn internal_reject(mut self, headers: Option<&Headers>) -> Result<S, (S, io::Error)> {
if let Some(custom) = headers {
self.headers.extend(custom.iter());
}
match self.send(StatusCode::BadRequest) {
Ok(()) => Ok(self.stream),
Err(e) => Err((self.stream, e)),
}
}
}
impl<S, B> WsUpgrade<S, B>
where
S: Stream + AsTcpStream,
{
/// Get a handle to the underlying TCP stream, useful to be able to set
/// TCP options, etc.
pub fn tcp_stream(&self) -> &TcpStream {
self.stream.as_tcp()
}
}
/// Trait to take a stream or similar and attempt to recover the start of a
/// websocket handshake from it.
/// Should be used when a stream might contain a request for a websocket session.
///
/// If an upgrade request can be parsed, one can accept or deny the handshake with
/// the `WsUpgrade` struct.
/// Otherwise the original stream is returned along with an error.
///
/// Note: the stream is owned because the websocket client expects to own its stream.
///
/// This is already implemented for all Streams, which means all types with Read + Write.
///
/// # Example
///
/// ```rust,no_run
/// use std::net::TcpListener;
/// use std::net::TcpStream;
/// use websocket::sync::server::upgrade::IntoWs;
/// use websocket::sync::Client;
///
/// let listener = TcpListener::bind("127.0.0.1:80").unwrap();
///
/// for stream in listener.incoming().filter_map(Result::ok) {
/// let mut client: Client<TcpStream> = match stream.into_ws() {
/// Ok(upgrade) => {
/// match upgrade.accept() {
/// Ok(client) => client,
/// Err(_) => panic!(),
/// }
/// },
/// Err(_) => panic!(),
/// };
/// }
/// ```
pub trait IntoWs {
/// The type of stream this upgrade process is working with (TcpStream, etc.)
type Stream: Stream;
/// An error value in case the stream is not asking for a websocket connection
/// or something went wrong. It is common to also include the stream here.
type Error;
/// Attempt to parse the start of a Websocket handshake, later with the returned
/// `WsUpgrade` struct, call `accept` to start a websocket client, and `reject` to
/// send a handshake rejection response.
fn into_ws(self) -> Result<Upgrade<Self::Stream>, Self::Error>;
}
impl<S> IntoWs for S
where
S: Stream,
{
type Stream = S;
type Error = (S, Option<Request>, Option<Buffer>, HyperIntoWsError);
fn into_ws(self) -> Result<Upgrade<Self::Stream>, Self::Error> {
let mut reader = BufReader::new(self);
let request = parse_request(&mut reader);
let (stream, buf, pos, cap) = reader.into_parts();
let buffer = Some(Buffer { buf, cap, pos });
let request = match request {
Ok(r) => r,
Err(e) => return Err((stream, None, buffer, e.into())),
};
match validate(&request.subject.0, request.version, &request.headers) {
Ok(_) => Ok(WsUpgrade {
headers: Headers::new(),
stream,
request,
buffer,
}),
Err(e) => Err((stream, Some(request), buffer, e)),
}
}
}
impl<S> IntoWs for RequestStreamPair<S>
where
S: Stream,
{
type Stream = S;
type Error = (S, Request, HyperIntoWsError);
fn into_ws(self) -> Result<Upgrade<Self::Stream>, Self::Error> {
match validate(&self.1.subject.0, self.1.version, &self.1.headers) {
Ok(_) => Ok(WsUpgrade {
headers: Headers::new(),
stream: self.0,
request: self.1,
buffer: None,
}),
Err(e) => Err((self.0, self.1, e)),
}
}
}
/// Upgrade a hyper connection to a websocket one.
///
/// A hyper request is implicitly defined as a stream from other `impl`s of Stream.
/// Until trait impl specialization comes along, we use this struct to differentiate
/// a hyper request (which already has parsed headers) from a normal stream.
///
/// Using this method, one can start a hyper server and check if each request
/// is a websocket upgrade request, if so you can use websockets and hyper on the
/// same port!
///
/// ```rust,no_run
/// # extern crate hyper;
/// # extern crate websocket;
/// # fn main() {
/// use hyper::server::{Server, Request, Response};
/// use websocket::Message;
/// use websocket::sync::server::upgrade::IntoWs;
/// use websocket::sync::server::upgrade::HyperRequest;
///
/// Server::http("0.0.0.0:80").unwrap().handle(move |req: Request, res: Response| {
/// match HyperRequest(req).into_ws() {
/// Ok(upgrade) => {
/// // `accept` sends a successful handshake, no need to worry about res
/// let mut client = match upgrade.accept() {
/// Ok(c) => c,
/// Err(_) => panic!(),
/// };
///
/// client.send_message(&Message::text("its free real estate"));
/// },
///
/// Err((request, err)) => {
/// // continue using the request as normal, "echo uri"
/// res.send(b"Try connecting over ws instead.").unwrap();
/// },
/// };
/// })
/// .unwrap();
/// # }
/// ```
pub struct HyperRequest<'a, 'b: 'a>(pub ::hyper::server::Request<'a, 'b>);
impl<'a, 'b> IntoWs for HyperRequest<'a, 'b> {
type Stream = &'a mut &'b mut dyn NetworkStream;
type Error = (::hyper::server::Request<'a, 'b>, HyperIntoWsError);
fn into_ws(self) -> Result<Upgrade<Self::Stream>, Self::Error> {
if let Err(e) = validate(&self.0.method, self.0.version, &self.0.headers) {
return Err((self.0, e));
}
let (_, method, headers, uri, version, reader) = self.0.deconstruct();
let reader = reader.into_inner();
let (buf, pos, cap) = reader.take_buf();
let stream = reader.get_mut();
Ok(Upgrade {
headers: Headers::new(),
stream,
buffer: Some(Buffer { buf, pos, cap }),
request: Incoming {
version,
headers,
subject: (method, uri),
},
})
}
}