use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
use std::task::{ready, Context, Poll};
use std::{future::Future, io, pin::Pin};
use async_channel::Sender;
use crate::{manager::ServerCommand, signals::Signal};
#[derive(Debug)]
pub(crate) struct ServerShared {
pub(crate) paused: AtomicBool,
}
#[derive(Debug)]
pub struct Server<T> {
shared: Arc<ServerShared>,
cmd: Sender<ServerCommand<T>>,
stop: Option<oneshot::Receiver<()>>,
}
impl<T> Server<T> {
pub(crate) fn new(cmd: Sender<ServerCommand<T>>, shared: Arc<ServerShared>) -> Self {
Server {
cmd,
shared,
stop: None,
}
}
pub fn build() -> crate::net::ServerBuilder {
crate::net::ServerBuilder::default()
}
pub(crate) fn signal(&self, sig: Signal) {
let _ = self.cmd.try_send(ServerCommand::Signal(sig));
}
pub fn process(&self, item: T) -> Result<(), T> {
if self.shared.paused.load(Ordering::Acquire) {
Err(item)
} else if let Err(e) = self.cmd.try_send(ServerCommand::Item(item)) {
match e.into_inner() {
ServerCommand::Item(item) => Err(item),
_ => panic!(),
}
} else {
Ok(())
}
}
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd.try_send(ServerCommand::Pause(tx));
async move {
let _ = rx.await;
}
}
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd.try_send(ServerCommand::Resume(tx));
async move {
let _ = rx.await;
}
}
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd.try_send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
async move {
let _ = rx.await;
}
}
}
impl<T> Clone for Server<T> {
fn clone(&self) -> Self {
Self {
cmd: self.cmd.clone(),
shared: self.shared.clone(),
stop: None,
}
}
}
impl<T> Future for Server<T> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.stop.is_none() {
let (tx, rx) = oneshot::channel();
if this.cmd.try_send(ServerCommand::NotifyStopped(tx)).is_err() {
return Poll::Ready(Ok(()));
}
this.stop = Some(rx);
}
let _ = ready!(Pin::new(this.stop.as_mut().unwrap()).poll(cx));
Poll::Ready(Ok(()))
}
}