1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
//! WebSockets for Actix Web, without actors.
//!
//! For usage, see documentation on [`handle()`].
#![warn(missing_docs)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
pub use actix_http::ws::{CloseCode, CloseReason, Item, Message, ProtocolError};
use actix_http::{
body::{BodyStream, MessageBody},
ws::handshake,
};
use actix_web::{web, HttpRequest, HttpResponse};
use tokio::sync::mpsc::channel;
mod aggregated;
mod session;
mod stream;
pub use self::{
aggregated::{AggregatedMessage, AggregatedMessageStream},
session::{Closed, Session},
stream::{MessageStream, StreamingBody},
};
/// Begin handling websocket traffic
///
/// ```no_run
/// use std::io;
/// use actix_web::{middleware::Logger, web, App, HttpRequest, HttpServer, Responder};
/// use actix_ws::Message;
/// use futures_util::StreamExt as _;
///
/// async fn ws(req: HttpRequest, body: web::Payload) -> actix_web::Result<impl Responder> {
/// let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
///
/// actix_web::rt::spawn(async move {
/// while let Some(Ok(msg)) = msg_stream.next().await {
/// match msg {
/// Message::Ping(bytes) => {
/// if session.pong(&bytes).await.is_err() {
/// return;
/// }
/// }
///
/// Message::Text(msg) => println!("Got text: {msg}"),
/// _ => break,
/// }
/// }
///
/// let _ = session.close(None).await;
/// });
///
/// Ok(response)
/// }
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> io::Result<()> {
/// HttpServer::new(move || {
/// App::new()
/// .route("/ws", web::get().to(ws))
/// .wrap(Logger::default())
/// })
/// .bind(("127.0.0.1", 8080))?
/// .run()
/// .await
/// }
/// ```
pub fn handle(
req: &HttpRequest,
body: web::Payload,
) -> Result<(HttpResponse, Session, MessageStream), actix_web::Error> {
let mut response = handshake(req.head())?;
let (tx, rx) = channel(32);
Ok((
response
.message_body(BodyStream::new(StreamingBody::new(rx)).boxed())?
.into(),
Session::new(tx),
MessageStream::new(body.into_inner()),
))
}