use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use dataframe::Opcode;
use result::{WebSocketError, WebSocketResult};
use std::borrow::Cow;
use std::io;
use std::io::Write;
use std::str::from_utf8;
use ws;
use ws::dataframe::DataFrame as DataFrameTrait;
use ws::util::bytes_to_string;
const FALSE_RESERVED_BITS: &[bool; 3] = &[false; 3];
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub enum Type {
Text = 1,
Binary = 2,
Ping = 9,
Pong = 10,
Close = 8,
}
#[derive(Eq, PartialEq, Clone, Debug)]
pub struct Message<'a> {
pub opcode: Type,
pub cd_status_code: Option<u16>,
pub payload: Cow<'a, [u8]>,
}
impl<'a> Message<'a> {
fn new(code: Type, status: Option<u16>, payload: Cow<'a, [u8]>) -> Self {
Message {
opcode: code,
cd_status_code: status,
payload,
}
}
pub fn text<S>(data: S) -> Self
where
S: Into<Cow<'a, str>>,
{
Message::new(
Type::Text,
None,
match data.into() {
Cow::Owned(msg) => Cow::Owned(msg.into_bytes()),
Cow::Borrowed(msg) => Cow::Borrowed(msg.as_bytes()),
},
)
}
pub fn binary<B>(data: B) -> Self
where
B: IntoCowBytes<'a>,
{
Message::new(Type::Binary, None, data.into())
}
pub fn close() -> Self {
Message::new(Type::Close, None, Cow::Borrowed(&[0 as u8; 0]))
}
pub fn close_because<S>(code: u16, reason: S) -> Self
where
S: Into<Cow<'a, str>>,
{
Message::new(
Type::Close,
Some(code),
match reason.into() {
Cow::Owned(msg) => Cow::Owned(msg.into_bytes()),
Cow::Borrowed(msg) => Cow::Borrowed(msg.as_bytes()),
},
)
}
pub fn ping<P>(data: P) -> Self
where
P: IntoCowBytes<'a>,
{
Message::new(Type::Ping, None, data.into())
}
pub fn pong<P>(data: P) -> Self
where
P: IntoCowBytes<'a>,
{
Message::new(Type::Pong, None, data.into())
}
#[cfg_attr(feature = "cargo-clippy", allow(wrong_self_convention))]
pub fn into_pong(&mut self) -> Result<(), ()> {
if self.opcode == Type::Ping {
self.opcode = Type::Pong;
Ok(())
} else {
Err(())
}
}
}
impl<'a> ws::dataframe::DataFrame for Message<'a> {
#[inline(always)]
fn is_last(&self) -> bool {
true
}
#[inline(always)]
fn opcode(&self) -> u8 {
self.opcode as u8
}
#[inline(always)]
fn reserved(&self) -> &[bool; 3] {
FALSE_RESERVED_BITS
}
fn size(&self) -> usize {
self.payload.len() + if self.cd_status_code.is_some() { 2 } else { 0 }
}
fn write_payload(&self, socket: &mut Write) -> WebSocketResult<()> {
if let Some(reason) = self.cd_status_code {
socket.write_u16::<BigEndian>(reason)?;
}
socket.write_all(&*self.payload)?;
Ok(())
}
fn take_payload(self) -> Vec<u8> {
if let Some(reason) = self.cd_status_code {
let mut buf = Vec::with_capacity(2 + self.payload.len());
buf.write_u16::<BigEndian>(reason)
.expect("failed to write close code in take_payload");
buf.append(&mut self.payload.into_owned());
buf
} else {
self.payload.into_owned()
}
}
}
impl<'a> ws::Message for Message<'a> {
fn serialize(&self, writer: &mut Write, masked: bool) -> WebSocketResult<()> {
self.write_to(writer, masked)
}
fn message_size(&self, masked: bool) -> usize {
self.frame_size(masked)
}
fn from_dataframes<D>(frames: Vec<D>) -> WebSocketResult<Self>
where
D: DataFrameTrait,
{
let opcode = frames
.first()
.ok_or(WebSocketError::ProtocolError("No dataframes provided"))
.map(ws::dataframe::DataFrame::opcode)?;
let opcode = Opcode::new(opcode);
let payload_size = frames.iter().map(ws::dataframe::DataFrame::size).sum();
let mut data = Vec::with_capacity(payload_size);
for (i, dataframe) in frames.into_iter().enumerate() {
if i > 0 && dataframe.opcode() != Opcode::Continuation as u8 {
return Err(WebSocketError::ProtocolError(
"Unexpected non-continuation data frame",
));
}
if *dataframe.reserved() != [false; 3] {
return Err(WebSocketError::ProtocolError(
"Unsupported reserved bits received",
));
}
data.append(&mut dataframe.take_payload());
}
if opcode == Some(Opcode::Text) {
if let Err(e) = from_utf8(data.as_slice()) {
return Err(e.into());
}
}
let msg = match opcode {
Some(Opcode::Text) => Message {
opcode: Type::Text,
cd_status_code: None,
payload: Cow::Owned(data),
},
Some(Opcode::Binary) => Message::binary(data),
Some(Opcode::Close) => {
if !data.is_empty() {
let status_code = (&data[..]).read_u16::<BigEndian>()?;
let reason = bytes_to_string(&data[2..])?;
Message::close_because(status_code, reason)
} else {
Message::close()
}
}
Some(Opcode::Ping) => Message::ping(data),
Some(Opcode::Pong) => Message::pong(data),
_ => return Err(WebSocketError::ProtocolError("Unsupported opcode received")),
};
Ok(msg)
}
}
#[derive(Eq, PartialEq, Clone, Debug)]
pub enum OwnedMessage {
Text(String),
Binary(Vec<u8>),
Close(Option<CloseData>),
Ping(Vec<u8>),
Pong(Vec<u8>),
}
impl OwnedMessage {
pub fn is_close(&self) -> bool {
match *self {
OwnedMessage::Close(_) => true,
_ => false,
}
}
pub fn is_control(&self) -> bool {
match *self {
OwnedMessage::Close(_) => true,
OwnedMessage::Ping(_) => true,
OwnedMessage::Pong(_) => true,
_ => false,
}
}
pub fn is_data(&self) -> bool {
!self.is_control()
}
pub fn is_ping(&self) -> bool {
match *self {
OwnedMessage::Ping(_) => true,
_ => false,
}
}
pub fn is_pong(&self) -> bool {
match *self {
OwnedMessage::Pong(_) => true,
_ => false,
}
}
}
impl ws::Message for OwnedMessage {
fn serialize(&self, writer: &mut Write, masked: bool) -> WebSocketResult<()> {
self.write_to(writer, masked)
}
fn message_size(&self, masked: bool) -> usize {
self.frame_size(masked)
}
fn from_dataframes<D>(frames: Vec<D>) -> WebSocketResult<Self>
where
D: DataFrameTrait,
{
Ok(Message::from_dataframes(frames)?.into())
}
}
impl ws::dataframe::DataFrame for OwnedMessage {
#[inline(always)]
fn is_last(&self) -> bool {
true
}
#[inline(always)]
fn opcode(&self) -> u8 {
(match *self {
OwnedMessage::Text(_) => Type::Text,
OwnedMessage::Binary(_) => Type::Binary,
OwnedMessage::Close(_) => Type::Close,
OwnedMessage::Ping(_) => Type::Ping,
OwnedMessage::Pong(_) => Type::Pong,
}) as u8
}
#[inline(always)]
fn reserved(&self) -> &[bool; 3] {
FALSE_RESERVED_BITS
}
fn size(&self) -> usize {
match *self {
OwnedMessage::Text(ref txt) => txt.len(),
OwnedMessage::Binary(ref bin) => bin.len(),
OwnedMessage::Ping(ref data) => data.len(),
OwnedMessage::Pong(ref data) => data.len(),
OwnedMessage::Close(ref data) => match data {
&Some(ref c) => c.reason.len() + 2,
&None => 0,
},
}
}
fn write_payload(&self, socket: &mut Write) -> WebSocketResult<()> {
match *self {
OwnedMessage::Text(ref txt) => socket.write_all(txt.as_bytes())?,
OwnedMessage::Binary(ref bin) => socket.write_all(bin.as_slice())?,
OwnedMessage::Ping(ref data) => socket.write_all(data.as_slice())?,
OwnedMessage::Pong(ref data) => socket.write_all(data.as_slice())?,
OwnedMessage::Close(ref data) => match data {
&Some(ref c) => {
socket.write_u16::<BigEndian>(c.status_code)?;
socket.write_all(c.reason.as_bytes())?
}
&None => (),
},
};
Ok(())
}
fn take_payload(self) -> Vec<u8> {
match self {
OwnedMessage::Text(txt) => txt.into_bytes(),
OwnedMessage::Binary(bin) => bin,
OwnedMessage::Ping(data) => data,
OwnedMessage::Pong(data) => data,
OwnedMessage::Close(data) => match data {
Some(c) => {
let mut buf = Vec::with_capacity(2 + c.reason.len());
buf.write_u16::<BigEndian>(c.status_code)
.expect("failed to write close code in take_payload");
buf.append(&mut c.reason.into_bytes());
buf
}
None => vec![],
},
}
}
}
impl From<String> for OwnedMessage {
fn from(text: String) -> Self {
OwnedMessage::Text(text)
}
}
impl From<Vec<u8>> for OwnedMessage {
fn from(buf: Vec<u8>) -> Self {
OwnedMessage::Binary(buf)
}
}
impl<'m> From<Message<'m>> for OwnedMessage {
fn from(message: Message<'m>) -> Self {
match message.opcode {
Type::Text => {
let convert = String::from_utf8_lossy(&message.payload).into_owned();
OwnedMessage::Text(convert)
}
Type::Close => match message.cd_status_code {
Some(code) => OwnedMessage::Close(Some(CloseData {
status_code: code,
reason: String::from_utf8_lossy(&message.payload).into_owned(),
})),
None => OwnedMessage::Close(None),
},
Type::Binary => OwnedMessage::Binary(message.payload.into_owned()),
Type::Ping => OwnedMessage::Ping(message.payload.into_owned()),
Type::Pong => OwnedMessage::Pong(message.payload.into_owned()),
}
}
}
impl<'m> From<OwnedMessage> for Message<'m> {
fn from(message: OwnedMessage) -> Self {
match message {
OwnedMessage::Text(txt) => Message::text(txt),
OwnedMessage::Binary(bin) => Message::binary(bin),
OwnedMessage::Close(because) => match because {
Some(c) => Message::close_because(c.status_code, c.reason),
None => Message::close(),
},
OwnedMessage::Ping(data) => Message::ping(data),
OwnedMessage::Pong(data) => Message::pong(data),
}
}
}
#[derive(Eq, PartialEq, Clone, Debug)]
pub struct CloseData {
pub status_code: u16,
pub reason: String,
}
impl CloseData {
pub fn new(status_code: u16, reason: String) -> CloseData {
CloseData {
status_code,
reason,
}
}
pub fn into_bytes(self) -> io::Result<Vec<u8>> {
let mut buf = Vec::new();
buf.write_u16::<BigEndian>(self.status_code)?;
for i in self.reason.as_bytes().iter() {
buf.push(*i);
}
Ok(buf)
}
}
pub trait IntoCowBytes<'a> {
fn into(self) -> Cow<'a, [u8]>;
}
impl<'a> IntoCowBytes<'a> for Vec<u8> {
fn into(self) -> Cow<'a, [u8]> {
Cow::Owned(self)
}
}
impl<'a> IntoCowBytes<'a> for &'a [u8] {
fn into(self) -> Cow<'a, [u8]> {
Cow::Borrowed(self)
}
}
impl<'a> IntoCowBytes<'a> for Cow<'a, [u8]> {
fn into(self) -> Cow<'a, [u8]> {
self
}
}