1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
use std::{
fmt,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use actix_http::ws::{CloseReason, Item, Message};
use actix_web::web::Bytes;
use bytestring::ByteString;
use tokio::sync::mpsc::Sender;
/// A handle into the websocket session.
///
/// This type can be used to send messages into the WebSocket.
#[derive(Clone)]
pub struct Session {
inner: Option<Sender<Message>>,
closed: Arc<AtomicBool>,
}
/// The error representing a closed websocket session
#[derive(Debug)]
pub struct Closed;
impl fmt::Display for Closed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Session is closed")
}
}
impl std::error::Error for Closed {}
impl Session {
pub(super) fn new(inner: Sender<Message>) -> Self {
Session {
inner: Some(inner),
closed: Arc::new(AtomicBool::new(false)),
}
}
fn pre_check(&mut self) {
if self.closed.load(Ordering::Relaxed) {
self.inner.take();
}
}
/// Sends text into the WebSocket.
///
/// ```no_run
/// # use actix_ws::Session;
/// # async fn test(mut session: Session) {
/// if session.text("Some text").await.is_err() {
/// // session closed
/// }
/// # }
/// ```
pub async fn text(&mut self, msg: impl Into<ByteString>) -> Result<(), Closed> {
self.pre_check();
if let Some(inner) = self.inner.as_mut() {
inner
.send(Message::Text(msg.into()))
.await
.map_err(|_| Closed)
} else {
Err(Closed)
}
}
/// Sends raw bytes into the WebSocket.
///
/// ```no_run
/// # use actix_ws::Session;
/// # async fn test(mut session: Session) {
/// if session.binary(&b"some bytes"[..]).await.is_err() {
/// // session closed
/// }
/// # }
/// ```
pub async fn binary(&mut self, msg: impl Into<Bytes>) -> Result<(), Closed> {
self.pre_check();
if let Some(inner) = self.inner.as_mut() {
inner
.send(Message::Binary(msg.into()))
.await
.map_err(|_| Closed)
} else {
Err(Closed)
}
}
/// Pings the client.
///
/// For many applications, it will be important to send regular pings to keep track of if the
/// client has disconnected
///
/// ```no_run
/// # use actix_ws::Session;
/// # async fn test(mut session: Session) {
/// if session.ping(b"").await.is_err() {
/// // session is closed
/// }
/// # }
/// ```
pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> {
self.pre_check();
if let Some(inner) = self.inner.as_mut() {
inner
.send(Message::Ping(Bytes::copy_from_slice(msg)))
.await
.map_err(|_| Closed)
} else {
Err(Closed)
}
}
/// Pongs the client.
///
/// ```no_run
/// # use actix_ws::{Message, Session};
/// # async fn test(mut session: Session, msg: Message) {
/// match msg {
/// Message::Ping(bytes) => {
/// let _ = session.pong(&bytes).await;
/// }
/// _ => (),
/// }
/// # }
pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> {
self.pre_check();
if let Some(inner) = self.inner.as_mut() {
inner
.send(Message::Pong(Bytes::copy_from_slice(msg)))
.await
.map_err(|_| Closed)
} else {
Err(Closed)
}
}
/// Manually controls sending continuations.
///
/// Be wary of this method. Continuations represent multiple frames that, when combined, are
/// presented as a single message. They are useful when the entire contents of a message are
/// not available all at once. However, continuations MUST NOT be interrupted by other Text or
/// Binary messages. Control messages such as Ping, Pong, or Close are allowed to interrupt a
/// continuation.
///
/// Continuations must be initialized with a First variant, and must be terminated by a Last
/// variant, with only Continue variants sent in between.
///
/// ```no_run
/// # use actix_ws::{Item, Session};
/// # async fn test(mut session: Session) -> Result<(), Box<dyn std::error::Error>> {
/// session.continuation(Item::FirstText("Hello".into())).await?;
/// session.continuation(Item::Continue(b", World"[..].into())).await?;
/// session.continuation(Item::Last(b"!"[..].into())).await?;
/// # Ok(())
/// # }
/// ```
pub async fn continuation(&mut self, msg: Item) -> Result<(), Closed> {
self.pre_check();
if let Some(inner) = self.inner.as_mut() {
inner
.send(Message::Continuation(msg))
.await
.map_err(|_| Closed)
} else {
Err(Closed)
}
}
/// Sends a close message, and consumes the session.
///
/// All clones will return `Err(Closed)` if used after this call.
///
/// ```no_run
/// # use actix_ws::{Closed, Session};
/// # async fn test(mut session: Session) -> Result<(), Closed> {
/// session.close(None).await
/// # }
/// ```
pub async fn close(mut self, reason: Option<CloseReason>) -> Result<(), Closed> {
self.pre_check();
if let Some(inner) = self.inner.take() {
self.closed.store(true, Ordering::Relaxed);
inner.send(Message::Close(reason)).await.map_err(|_| Closed)
} else {
Err(Closed)
}
}
}