1use crate::dataframe::Opcode;
3use crate::result::{WebSocketError, WebSocketResult};
4use crate::ws;
5use crate::ws::dataframe::DataFrame as DataFrameTrait;
6use crate::ws::util::bytes_to_string;
7use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
8use std::borrow::Cow;
9use std::io;
10use std::io::Write;
11use std::str::from_utf8;
12
13const FALSE_RESERVED_BITS: &[bool; 3] = &[false; 3];
14
15#[derive(Debug, Eq, PartialEq, Clone, Copy)]
17pub enum Type {
18 Text = 1,
20 Binary = 2,
22 Ping = 9,
24 Pong = 10,
26 Close = 8,
28}
29
30#[derive(Eq, PartialEq, Clone, Debug)]
39pub struct Message<'a> {
40 pub opcode: Type,
42 pub cd_status_code: Option<u16>,
45 pub payload: Cow<'a, [u8]>,
47}
48
49impl<'a> Message<'a> {
50 fn new(code: Type, status: Option<u16>, payload: Cow<'a, [u8]>) -> Self {
51 Message {
52 opcode: code,
53 cd_status_code: status,
54 payload,
55 }
56 }
57
58 pub fn text<S>(data: S) -> Self
60 where
61 S: Into<Cow<'a, str>>,
62 {
63 Message::new(
64 Type::Text,
65 None,
66 match data.into() {
67 Cow::Owned(msg) => Cow::Owned(msg.into_bytes()),
68 Cow::Borrowed(msg) => Cow::Borrowed(msg.as_bytes()),
69 },
70 )
71 }
72
73 pub fn binary<B>(data: B) -> Self
75 where
76 B: IntoCowBytes<'a>,
77 {
78 Message::new(Type::Binary, None, data.into())
79 }
80
81 pub fn close() -> Self {
84 Message::new(Type::Close, None, Cow::Borrowed(&[0 as u8; 0]))
85 }
86
87 pub fn close_because<S>(code: u16, reason: S) -> Self
91 where
92 S: Into<Cow<'a, str>>,
93 {
94 Message::new(
95 Type::Close,
96 Some(code),
97 match reason.into() {
98 Cow::Owned(msg) => Cow::Owned(msg.into_bytes()),
99 Cow::Borrowed(msg) => Cow::Borrowed(msg.as_bytes()),
100 },
101 )
102 }
103
104 pub fn ping<P>(data: P) -> Self
107 where
108 P: IntoCowBytes<'a>,
109 {
110 Message::new(Type::Ping, None, data.into())
111 }
112
113 pub fn pong<P>(data: P) -> Self
116 where
117 P: IntoCowBytes<'a>,
118 {
119 Message::new(Type::Pong, None, data.into())
120 }
121
122 #[allow(clippy::wrong_self_convention)]
124 pub fn into_pong(&mut self) -> Result<(), ()> {
127 if self.opcode == Type::Ping {
128 self.opcode = Type::Pong;
129 Ok(())
130 } else {
131 Err(())
132 }
133 }
134}
135
136impl<'a> ws::dataframe::DataFrame for Message<'a> {
137 #[inline(always)]
138 fn is_last(&self) -> bool {
139 true
140 }
141
142 #[inline(always)]
143 fn opcode(&self) -> u8 {
144 self.opcode as u8
145 }
146
147 #[inline(always)]
148 fn reserved(&self) -> &[bool; 3] {
149 FALSE_RESERVED_BITS
150 }
151
152 fn size(&self) -> usize {
153 self.payload.len() + if self.cd_status_code.is_some() { 2 } else { 0 }
154 }
155
156 fn write_payload(&self, socket: &mut dyn Write) -> WebSocketResult<()> {
157 if let Some(reason) = self.cd_status_code {
158 socket.write_u16::<BigEndian>(reason)?;
159 }
160 socket.write_all(&*self.payload)?;
161 Ok(())
162 }
163
164 fn take_payload(self) -> Vec<u8> {
165 if let Some(reason) = self.cd_status_code {
166 let mut buf = Vec::with_capacity(2 + self.payload.len());
167 buf.write_u16::<BigEndian>(reason)
168 .expect("failed to write close code in take_payload");
169 buf.append(&mut self.payload.into_owned());
170 buf
171 } else {
172 self.payload.into_owned()
173 }
174 }
175}
176
177impl<'a> ws::Message for Message<'a> {
178 fn serialize(&self, writer: &mut dyn Write, masked: bool) -> WebSocketResult<()> {
180 self.write_to(writer, masked)
181 }
182
183 fn message_size(&self, masked: bool) -> usize {
185 self.frame_size(masked)
186 }
187
188 fn from_dataframes<D>(frames: Vec<D>) -> WebSocketResult<Self>
190 where
191 D: DataFrameTrait,
192 {
193 let opcode = frames
194 .first()
195 .ok_or(WebSocketError::ProtocolError("No dataframes provided"))
196 .map(ws::dataframe::DataFrame::opcode)?;
197 let opcode = Opcode::new(opcode);
198
199 let payload_size = frames.iter().map(ws::dataframe::DataFrame::size).sum();
200
201 let mut data = Vec::with_capacity(payload_size);
202
203 for (i, dataframe) in frames.into_iter().enumerate() {
204 if i > 0 && dataframe.opcode() != Opcode::Continuation as u8 {
205 return Err(WebSocketError::ProtocolError(
206 "Unexpected non-continuation data frame",
207 ));
208 }
209 if *dataframe.reserved() != [false; 3] {
210 return Err(WebSocketError::ProtocolError(
211 "Unsupported reserved bits received",
212 ));
213 }
214 data.append(&mut dataframe.take_payload());
215 }
216
217 if opcode == Some(Opcode::Text) {
218 if let Err(e) = from_utf8(data.as_slice()) {
219 return Err(e.into());
220 }
221 }
222
223 let msg = match opcode {
224 Some(Opcode::Text) => Message {
225 opcode: Type::Text,
226 cd_status_code: None,
227 payload: Cow::Owned(data),
228 },
229 Some(Opcode::Binary) => Message::binary(data),
230 Some(Opcode::Close) => {
231 if !data.is_empty() {
232 let status_code = (&data[..]).read_u16::<BigEndian>()?;
233 let reason = bytes_to_string(&data[2..])?;
234 Message::close_because(status_code, reason)
235 } else {
236 Message::close()
237 }
238 }
239 Some(Opcode::Ping) => Message::ping(data),
240 Some(Opcode::Pong) => Message::pong(data),
241 _ => return Err(WebSocketError::ProtocolError("Unsupported opcode received")),
242 };
243 Ok(msg)
244 }
245}
246
247#[derive(Eq, PartialEq, Clone, Debug)]
256pub enum OwnedMessage {
257 Text(String),
259 Binary(Vec<u8>),
261 Close(Option<CloseData>),
264 Ping(Vec<u8>),
268 Pong(Vec<u8>),
271}
272
273impl OwnedMessage {
274 pub fn is_close(&self) -> bool {
281 match *self {
282 OwnedMessage::Close(_) => true,
283 _ => false,
284 }
285 }
286
287 pub fn is_control(&self) -> bool {
297 match *self {
298 OwnedMessage::Close(_) => true,
299 OwnedMessage::Ping(_) => true,
300 OwnedMessage::Pong(_) => true,
301 _ => false,
302 }
303 }
304
305 pub fn is_data(&self) -> bool {
314 !self.is_control()
315 }
316
317 pub fn is_ping(&self) -> bool {
326 match *self {
327 OwnedMessage::Ping(_) => true,
328 _ => false,
329 }
330 }
331
332 pub fn is_pong(&self) -> bool {
340 match *self {
341 OwnedMessage::Pong(_) => true,
342 _ => false,
343 }
344 }
345}
346
347impl ws::Message for OwnedMessage {
348 fn serialize(&self, writer: &mut dyn Write, masked: bool) -> WebSocketResult<()> {
350 self.write_to(writer, masked)
351 }
352
353 fn message_size(&self, masked: bool) -> usize {
355 self.frame_size(masked)
356 }
357
358 fn from_dataframes<D>(frames: Vec<D>) -> WebSocketResult<Self>
360 where
361 D: DataFrameTrait,
362 {
363 Ok(Message::from_dataframes(frames)?.into())
364 }
365}
366
367impl ws::dataframe::DataFrame for OwnedMessage {
368 #[inline(always)]
369 fn is_last(&self) -> bool {
370 true
371 }
372
373 #[inline(always)]
374 fn opcode(&self) -> u8 {
375 (match *self {
376 OwnedMessage::Text(_) => Type::Text,
377 OwnedMessage::Binary(_) => Type::Binary,
378 OwnedMessage::Close(_) => Type::Close,
379 OwnedMessage::Ping(_) => Type::Ping,
380 OwnedMessage::Pong(_) => Type::Pong,
381 }) as u8
382 }
383
384 #[inline(always)]
385 fn reserved(&self) -> &[bool; 3] {
386 FALSE_RESERVED_BITS
387 }
388
389 fn size(&self) -> usize {
390 match *self {
391 OwnedMessage::Text(ref txt) => txt.len(),
392 OwnedMessage::Binary(ref bin) => bin.len(),
393 OwnedMessage::Ping(ref data) => data.len(),
394 OwnedMessage::Pong(ref data) => data.len(),
395 OwnedMessage::Close(ref data) => match data {
396 &Some(ref c) => c.reason.len() + 2,
397 &None => 0,
398 },
399 }
400 }
401
402 fn write_payload(&self, socket: &mut dyn Write) -> WebSocketResult<()> {
403 match *self {
404 OwnedMessage::Text(ref txt) => socket.write_all(txt.as_bytes())?,
405 OwnedMessage::Binary(ref bin) => socket.write_all(bin.as_slice())?,
406 OwnedMessage::Ping(ref data) => socket.write_all(data.as_slice())?,
407 OwnedMessage::Pong(ref data) => socket.write_all(data.as_slice())?,
408 OwnedMessage::Close(ref data) => match data {
409 &Some(ref c) => {
410 socket.write_u16::<BigEndian>(c.status_code)?;
411 socket.write_all(c.reason.as_bytes())?
412 }
413 &None => (),
414 },
415 };
416 Ok(())
417 }
418
419 fn take_payload(self) -> Vec<u8> {
420 match self {
421 OwnedMessage::Text(txt) => txt.into_bytes(),
422 OwnedMessage::Binary(bin) => bin,
423 OwnedMessage::Ping(data) => data,
424 OwnedMessage::Pong(data) => data,
425 OwnedMessage::Close(data) => match data {
426 Some(c) => {
427 let mut buf = Vec::with_capacity(2 + c.reason.len());
428 buf.write_u16::<BigEndian>(c.status_code)
429 .expect("failed to write close code in take_payload");
430 buf.append(&mut c.reason.into_bytes());
431 buf
432 }
433 None => vec![],
434 },
435 }
436 }
437}
438
439impl From<String> for OwnedMessage {
440 fn from(text: String) -> Self {
441 OwnedMessage::Text(text)
442 }
443}
444
445impl From<Vec<u8>> for OwnedMessage {
446 fn from(buf: Vec<u8>) -> Self {
447 OwnedMessage::Binary(buf)
448 }
449}
450
451impl<'m> From<Message<'m>> for OwnedMessage {
452 fn from(message: Message<'m>) -> Self {
453 match message.opcode {
454 Type::Text => {
455 let convert = String::from_utf8_lossy(&message.payload).into_owned();
456 OwnedMessage::Text(convert)
457 }
458 Type::Close => match message.cd_status_code {
459 Some(code) => OwnedMessage::Close(Some(CloseData {
460 status_code: code,
461 reason: String::from_utf8_lossy(&message.payload).into_owned(),
462 })),
463 None => OwnedMessage::Close(None),
464 },
465 Type::Binary => OwnedMessage::Binary(message.payload.into_owned()),
466 Type::Ping => OwnedMessage::Ping(message.payload.into_owned()),
467 Type::Pong => OwnedMessage::Pong(message.payload.into_owned()),
468 }
469 }
470}
471
472impl<'m> From<OwnedMessage> for Message<'m> {
473 fn from(message: OwnedMessage) -> Self {
474 match message {
475 OwnedMessage::Text(txt) => Message::text(txt),
476 OwnedMessage::Binary(bin) => Message::binary(bin),
477 OwnedMessage::Close(because) => match because {
478 Some(c) => Message::close_because(c.status_code, c.reason),
479 None => Message::close(),
480 },
481 OwnedMessage::Ping(data) => Message::ping(data),
482 OwnedMessage::Pong(data) => Message::pong(data),
483 }
484 }
485}
486
487#[derive(Eq, PartialEq, Clone, Debug)]
489pub struct CloseData {
490 pub status_code: u16,
492 pub reason: String,
494}
495
496impl CloseData {
497 pub fn new(status_code: u16, reason: String) -> CloseData {
499 CloseData {
500 status_code,
501 reason,
502 }
503 }
504 pub fn into_bytes(self) -> io::Result<Vec<u8>> {
506 let mut buf = Vec::new();
507 buf.write_u16::<BigEndian>(self.status_code)?;
508 for i in self.reason.as_bytes().iter() {
509 buf.push(*i);
510 }
511 Ok(buf)
512 }
513}
514
515pub trait IntoCowBytes<'a> {
518 fn into(self) -> Cow<'a, [u8]>;
520}
521
522impl<'a> IntoCowBytes<'a> for Vec<u8> {
523 fn into(self) -> Cow<'a, [u8]> {
524 Cow::Owned(self)
525 }
526}
527
528impl<'a> IntoCowBytes<'a> for &'a [u8] {
529 fn into(self) -> Cow<'a, [u8]> {
530 Cow::Borrowed(self)
531 }
532}
533
534impl<'a> IntoCowBytes<'a> for Cow<'a, [u8]> {
535 fn into(self) -> Cow<'a, [u8]> {
536 self
537 }
538}