use std::rc::Rc;
use std::sync::{mpsc as sync_mpsc, Arc};
use std::time::Duration;
use std::{io, net};
use actix::{
fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
Response, StreamHandler, System, WrapFuture,
};
use futures::sync::mpsc;
use futures::{Future, Sink, Stream};
use mio;
use net2::TcpBuilder;
use num_cpus;
use slab::Slab;
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "tls")]
use native_tls::TlsAcceptor;
#[cfg(feature = "alpn")]
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
#[cfg(feature = "rust-tls")]
use rustls::ServerConfig;
use super::accept::{start_accept_thread, Command};
use super::channel::{HttpChannel, WrapperStream};
use super::settings::{ServerSettings, WorkerSettings};
use super::worker::{Conn, SocketInfo, StopWorker, StreamHandlerType, Worker};
use super::{IntoHttpHandler, IoStream, KeepAlive};
use super::{PauseServer, ResumeServer, StopServer};
#[cfg(feature = "alpn")]
fn configure_alpn(builder: &mut SslAcceptorBuilder) -> io::Result<()> {
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
Ok(())
}
pub struct HttpServer<H>
where
H: IntoHttpHandler + 'static,
{
h: Option<Rc<WorkerSettings<H::Handler>>>,
threads: usize,
backlog: i32,
host: Option<String>,
keep_alive: KeepAlive,
factory: Arc<Fn() -> Vec<H> + Send + Sync>,
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
workers: Vec<(usize, Addr<Worker<H::Handler>>)>,
sockets: Vec<Socket>,
accept: Option<(
mio::SetReadiness,
sync_mpsc::Sender<Command>,
Slab<SocketInfo>,
)>,
exit: bool,
shutdown_timeout: u16,
signals: Option<Addr<signal::ProcessSignals>>,
no_http2: bool,
no_signals: bool,
}
pub(crate) enum ServerCommand {
WorkerDied(usize),
}
impl<H> Actor for HttpServer<H>
where
H: IntoHttpHandler,
{
type Context = Context<Self>;
}
pub(crate) struct Socket {
pub lst: net::TcpListener,
pub addr: net::SocketAddr,
pub tp: StreamHandlerType,
}
impl<H> HttpServer<H>
where
H: IntoHttpHandler + 'static,
{
pub fn new<F, U>(factory: F) -> Self
where
F: Fn() -> U + Sync + Send + 'static,
U: IntoIterator<Item = H> + 'static,
{
let f = move || (factory)().into_iter().collect();
HttpServer {
h: None,
threads: num_cpus::get(),
backlog: 2048,
host: None,
keep_alive: KeepAlive::Os,
factory: Arc::new(f),
workers: Vec::new(),
sockets: Vec::new(),
accept: None,
exit: false,
shutdown_timeout: 30,
signals: None,
no_http2: false,
no_signals: false,
}
}
pub fn workers(mut self, num: usize) -> Self {
self.threads = num;
self
}
#[doc(hidden)]
#[deprecated(
since = "0.6.0",
note = "please use `HttpServer::workers()` instead"
)]
pub fn threads(self, num: usize) -> Self {
self.workers(num)
}
pub fn backlog(mut self, num: i32) -> Self {
self.backlog = num;
self
}
pub fn keep_alive<T: Into<KeepAlive>>(mut self, val: T) -> Self {
self.keep_alive = val.into();
self
}
pub fn server_hostname(mut self, val: String) -> Self {
self.host = Some(val);
self
}
pub fn system_exit(mut self) -> Self {
self.exit = true;
self
}
pub fn signals(mut self, addr: Addr<signal::ProcessSignals>) -> Self {
self.signals = Some(addr);
self
}
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
self
}
pub fn shutdown_timeout(mut self, sec: u16) -> Self {
self.shutdown_timeout = sec;
self
}
pub fn no_http2(mut self) -> Self {
self.no_http2 = true;
self
}
pub fn addrs(&self) -> Vec<net::SocketAddr> {
self.sockets.iter().map(|s| s.addr).collect()
}
pub fn addrs_with_scheme(&self) -> Vec<(net::SocketAddr, &str)> {
self.sockets
.iter()
.map(|s| (s.addr, s.tp.scheme()))
.collect()
}
pub fn listen(mut self, lst: net::TcpListener) -> Self {
let addr = lst.local_addr().unwrap();
self.sockets.push(Socket {
addr,
lst,
tp: StreamHandlerType::Normal,
});
self
}
#[cfg(feature = "tls")]
pub fn listen_tls(mut self, lst: net::TcpListener, acceptor: TlsAcceptor) -> Self {
let addr = lst.local_addr().unwrap();
self.sockets.push(Socket {
addr,
lst,
tp: StreamHandlerType::Tls(acceptor.clone()),
});
self
}
#[cfg(feature = "alpn")]
pub fn listen_ssl(
mut self, lst: net::TcpListener, mut builder: SslAcceptorBuilder,
) -> io::Result<Self> {
if !self.no_http2 {
configure_alpn(&mut builder)?;
}
let acceptor = builder.build();
let addr = lst.local_addr().unwrap();
self.sockets.push(Socket {
addr,
lst,
tp: StreamHandlerType::Alpn(acceptor.clone()),
});
Ok(self)
}
#[cfg(feature = "rust-tls")]
pub fn listen_rustls(
mut self, lst: net::TcpListener, mut builder: ServerConfig,
) -> io::Result<Self> {
if !self.no_http2 {
builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
}
let addr = lst.local_addr().unwrap();
self.sockets.push(Socket {
addr,
lst,
tp: StreamHandlerType::Rustls(Arc::new(builder)),
});
Ok(self)
}
fn bind2<S: net::ToSocketAddrs>(&mut self, addr: S) -> io::Result<Vec<Socket>> {
let mut err = None;
let mut succ = false;
let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr, self.backlog) {
Ok(lst) => {
succ = true;
let addr = lst.local_addr().unwrap();
sockets.push(Socket {
lst,
addr,
tp: StreamHandlerType::Normal,
});
}
Err(e) => err = Some(e),
}
}
if !succ {
if let Some(e) = err.take() {
Err(e)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
}
} else {
Ok(sockets)
}
}
pub fn bind<S: net::ToSocketAddrs>(mut self, addr: S) -> io::Result<Self> {
let sockets = self.bind2(addr)?;
self.sockets.extend(sockets);
Ok(self)
}
#[cfg(feature = "tls")]
pub fn bind_tls<S: net::ToSocketAddrs>(
mut self, addr: S, acceptor: TlsAcceptor,
) -> io::Result<Self> {
let sockets = self.bind2(addr)?;
self.sockets.extend(sockets.into_iter().map(|mut s| {
s.tp = StreamHandlerType::Tls(acceptor.clone());
s
}));
Ok(self)
}
#[cfg(feature = "alpn")]
pub fn bind_ssl<S: net::ToSocketAddrs>(
mut self, addr: S, mut builder: SslAcceptorBuilder,
) -> io::Result<Self> {
if !self.no_http2 {
configure_alpn(&mut builder)?;
}
let acceptor = builder.build();
let sockets = self.bind2(addr)?;
self.sockets.extend(sockets.into_iter().map(|mut s| {
s.tp = StreamHandlerType::Alpn(acceptor.clone());
s
}));
Ok(self)
}
#[cfg(feature = "rust-tls")]
pub fn bind_rustls<S: net::ToSocketAddrs>(
mut self, addr: S, mut builder: ServerConfig,
) -> io::Result<Self> {
if !self.no_http2 {
builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
}
let builder = Arc::new(builder);
let sockets = self.bind2(addr)?;
self.sockets.extend(sockets.into_iter().map(move |mut s| {
s.tp = StreamHandlerType::Rustls(builder.clone());
s
}));
Ok(self)
}
fn start_workers(
&mut self, settings: &ServerSettings, sockets: &Slab<SocketInfo>,
) -> Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)> {
let mut workers = Vec::new();
for idx in 0..self.threads {
let (tx, rx) = mpsc::unbounded::<Conn<net::TcpStream>>();
let ka = self.keep_alive;
let socks = sockets.clone();
let factory = Arc::clone(&self.factory);
let parts = settings.parts();
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let s = ServerSettings::from_parts(parts);
let apps: Vec<_> =
(*factory)().into_iter().map(|h| h.into_handler()).collect();
ctx.add_message_stream(rx);
Worker::new(apps, socks, ka, s)
});
workers.push((idx, tx));
self.workers.push((idx, addr));
}
info!("Starting {} http workers", self.threads);
workers
}
fn subscribe_to_signals(&self) -> Option<Addr<signal::ProcessSignals>> {
if !self.no_signals {
if let Some(ref signals) = self.signals {
Some(signals.clone())
} else {
Some(System::current().registry().get::<signal::ProcessSignals>())
}
} else {
None
}
}
}
impl<H: IntoHttpHandler> HttpServer<H> {
pub fn start(mut self) -> Addr<Self> {
if self.sockets.is_empty() {
panic!("HttpServer::bind() has to be called before start()");
} else {
let (tx, rx) = mpsc::unbounded();
let mut socks = Slab::new();
let mut addrs: Vec<(usize, Socket)> = Vec::new();
for socket in self.sockets.drain(..) {
let entry = socks.vacant_entry();
let token = entry.key();
entry.insert(SocketInfo {
addr: socket.addr,
htype: socket.tp.clone(),
});
addrs.push((token, socket));
}
let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false);
let workers = self.start_workers(&settings, &socks);
for (_, sock) in &addrs {
info!("Starting server on http://{}", sock.addr);
}
let (r, cmd) = start_accept_thread(addrs, tx.clone(), workers.clone());
self.accept = Some((r, cmd, socks));
let signals = self.subscribe_to_signals();
let addr = Actor::create(move |ctx| {
ctx.add_stream(rx);
self
});
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
}
addr
}
}
pub fn run(self) {
let sys = System::new("http-server");
self.start();
sys.run();
}
}
#[doc(hidden)]
#[cfg(feature = "tls")]
#[deprecated(
since = "0.6.0",
note = "please use `actix_web::HttpServer::bind_tls` instead"
)]
impl<H: IntoHttpHandler> HttpServer<H> {
pub fn start_tls(mut self, acceptor: TlsAcceptor) -> io::Result<Addr<Self>> {
for sock in &mut self.sockets {
match sock.tp {
StreamHandlerType::Normal => (),
_ => continue,
}
sock.tp = StreamHandlerType::Tls(acceptor.clone());
}
Ok(self.start())
}
}
#[doc(hidden)]
#[cfg(feature = "alpn")]
#[deprecated(
since = "0.6.0",
note = "please use `actix_web::HttpServer::bind_ssl` instead"
)]
impl<H: IntoHttpHandler> HttpServer<H> {
pub fn start_ssl(
mut self, mut builder: SslAcceptorBuilder,
) -> io::Result<Addr<Self>> {
if !self.no_http2 {
builder.set_alpn_protos(b"\x02h2\x08http/1.1")?;
builder.set_alpn_select_callback(|_, protos| {
const H2: &[u8] = b"\x02h2";
if protos.windows(3).any(|window| window == H2) {
Ok(b"h2")
} else {
Err(AlpnError::NOACK)
}
});
}
let acceptor = builder.build();
for sock in &mut self.sockets {
match sock.tp {
StreamHandlerType::Normal => (),
_ => continue,
}
sock.tp = StreamHandlerType::Alpn(acceptor.clone());
}
Ok(self.start())
}
}
impl<H: IntoHttpHandler> HttpServer<H> {
pub fn start_incoming<T, S>(mut self, stream: S, secure: bool) -> Addr<Self>
where
S: Stream<Item = T, Error = io::Error> + Send + 'static,
T: AsyncRead + AsyncWrite + Send + 'static,
{
let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
let settings = ServerSettings::new(Some(addr), &self.host, secure);
let apps: Vec<_> = (*self.factory)()
.into_iter()
.map(|h| h.into_handler())
.collect();
self.h = Some(Rc::new(WorkerSettings::new(
apps,
self.keep_alive,
settings,
)));
let signals = self.subscribe_to_signals();
let addr = HttpServer::create(move |ctx| {
ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn {
io: WrapperStream::new(t),
token: 0,
peer: None,
http2: false,
}));
self
});
if let Some(signals) = signals {
signals.do_send(signal::Subscribe(addr.clone().recipient()))
}
addr
}
}
impl<H: IntoHttpHandler> Handler<signal::Signal> for HttpServer<H> {
type Result = ();
fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) {
match msg.0 {
signal::SignalType::Int => {
info!("SIGINT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
signal::SignalType::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: true }, ctx);
}
signal::SignalType::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
Handler::<StopServer>::handle(self, StopServer { graceful: false }, ctx);
}
_ => (),
}
}
}
impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
fn finished(&mut self, _: &mut Context<Self>) {}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg {
ServerCommand::WorkerDied(idx) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let (tx, rx) = mpsc::unbounded::<Conn<net::TcpStream>>();
let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let ka = self.keep_alive;
let factory = Arc::clone(&self.factory);
let host = self.host.clone();
let socks = self.accept.as_ref().unwrap().2.clone();
let addr = socks[0].addr;
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let settings = ServerSettings::new(Some(addr), &host, false);
let apps: Vec<_> =
(*factory)().into_iter().map(|h| h.into_handler()).collect();
ctx.add_message_stream(rx);
Worker::new(apps, socks, ka, settings)
});
if let Some(ref item) = &self.accept {
let _ = item.1.send(Command::Worker(new_idx, tx.clone()));
let _ = item.0.set_readiness(mio::Ready::readable());
}
self.workers.push((new_idx, addr));
}
}
}
}
}
impl<T, H> Handler<Conn<T>> for HttpServer<H>
where
T: IoStream,
H: IntoHttpHandler,
{
type Result = ();
fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Self::Result {
Arbiter::spawn(HttpChannel::new(
Rc::clone(self.h.as_ref().unwrap()),
msg.io,
msg.peer,
msg.http2,
));
}
}
impl<H: IntoHttpHandler> Handler<PauseServer> for HttpServer<H> {
type Result = ();
fn handle(&mut self, _: PauseServer, _: &mut Context<Self>) {
for item in &self.accept {
let _ = item.1.send(Command::Pause);
let _ = item.0.set_readiness(mio::Ready::readable());
}
}
}
impl<H: IntoHttpHandler> Handler<ResumeServer> for HttpServer<H> {
type Result = ();
fn handle(&mut self, _: ResumeServer, _: &mut Context<Self>) {
for item in &self.accept {
let _ = item.1.send(Command::Resume);
let _ = item.0.set_readiness(mio::Ready::readable());
}
}
}
impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H> {
type Result = Response<(), ()>;
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
for item in &self.accept {
let _ = item.1.send(Command::Stop);
let _ = item.0.set_readiness(mio::Ready::readable());
}
let (tx, rx) = mpsc::channel(1);
let dur = if msg.graceful {
Some(Duration::new(u64::from(self.shutdown_timeout), 0))
} else {
None
};
for worker in &self.workers {
let tx2 = tx.clone();
ctx.spawn(
worker
.1
.send(StopWorker { graceful: dur })
.into_actor(self)
.then(move |_, slf, ctx| {
slf.workers.pop();
if slf.workers.is_empty() {
let _ = tx2.send(());
if slf.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
}
fut::ok(())
}),
);
}
if !self.workers.is_empty() {
Response::async(rx.into_future().map(|_| ()).map_err(|_| ()))
} else {
if self.exit {
ctx.run_later(Duration::from_millis(300), |_, _| {
System::current().stop();
});
}
Response::reply(Ok(()))
}
}
}
fn create_tcp_listener(
addr: net::SocketAddr, backlog: i32,
) -> io::Result<net::TcpListener> {
let builder = match addr {
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
};
builder.reuse_address(true)?;
builder.bind(addr)?;
Ok(builder.listen(backlog)?)
}