postcard_rpc/server/impls/
test_channels.rsuse core::{
convert::Infallible,
future::{pending, Future},
sync::atomic::{AtomicU32, Ordering},
};
use std::sync::Arc;
use crate::{
header::{VarHeader, VarKey, VarKeyKind, VarSeq},
host_client::util::Stopper,
server::{
AsWireRxErrorKind, AsWireTxErrorKind, WireRx, WireRxErrorKind, WireSpawn, WireTx,
WireTxErrorKind,
},
standard_icd::LoggingTopic,
Topic,
};
use core::fmt::Arguments;
use tokio::{select, sync::mpsc};
pub mod dispatch_impl {
pub use crate::host_client::util::Stopper;
use crate::{
header::VarKeyKind,
server::{Dispatch, Server},
};
pub use super::tokio_spawn as spawn_fn;
pub struct Settings {
pub tx: WireTxImpl,
pub rx: WireRxImpl,
pub buf: usize,
pub kkind: VarKeyKind,
}
pub type WireTxImpl = super::ChannelWireTx;
pub type WireRxImpl = super::ChannelWireRx;
pub type WireSpawnImpl = super::ChannelWireSpawn;
pub type WireRxBuf = Box<[u8]>;
pub fn new_server<D>(
dispatch: D,
settings: Settings,
) -> crate::server::Server<WireTxImpl, WireRxImpl, WireRxBuf, D>
where
D: Dispatch<Tx = WireTxImpl>,
{
let buf = vec![0; settings.buf];
Server::new(
settings.tx,
settings.rx,
buf.into_boxed_slice(),
dispatch,
settings.kkind,
)
}
pub fn new_server_stoppable<D>(
dispatch: D,
mut settings: Settings,
) -> (
crate::server::Server<WireTxImpl, WireRxImpl, WireRxBuf, D>,
Stopper,
)
where
D: Dispatch<Tx = WireTxImpl>,
{
let stopper = Stopper::new();
settings.tx.set_stopper(stopper.clone());
settings.rx.set_stopper(stopper.clone());
let buf = vec![0; settings.buf];
let me = Server::new(
settings.tx,
settings.rx,
buf.into_boxed_slice(),
dispatch,
settings.kkind,
);
(me, stopper)
}
}
#[derive(Clone)]
pub struct ChannelWireTx {
tx: mpsc::Sender<Vec<u8>>,
log_ctr: Arc<AtomicU32>,
stopper: Option<Stopper>,
}
impl ChannelWireTx {
pub fn new(tx: mpsc::Sender<Vec<u8>>) -> Self {
Self {
tx,
log_ctr: Arc::new(AtomicU32::new(0)),
stopper: None,
}
}
pub fn set_stopper(&mut self, stopper: Stopper) {
self.stopper = Some(stopper);
}
async fn inner_send(&self, msg: Vec<u8>) -> Result<(), ChannelWireTxError> {
let stop_fut = async {
if let Some(s) = self.stopper.as_ref() {
s.wait_stopped().await;
} else {
pending::<()>().await;
}
};
select! {
_ = stop_fut => {
Err(ChannelWireTxError::ChannelClosed)
}
res = self.tx.send(msg) => {
match res {
Ok(()) => Ok(()),
Err(_) => Err(ChannelWireTxError::ChannelClosed)
}
}
}
}
}
impl WireTx for ChannelWireTx {
type Error = ChannelWireTxError;
async fn send<T: serde::Serialize + ?Sized>(
&self,
hdr: crate::header::VarHeader,
msg: &T,
) -> Result<(), Self::Error> {
let mut hdr_ser = hdr.write_to_vec();
let bdy_ser = postcard::to_stdvec(msg).unwrap();
hdr_ser.extend_from_slice(&bdy_ser);
self.inner_send(hdr_ser).await
}
async fn send_raw(&self, buf: &[u8]) -> Result<(), Self::Error> {
let buf = buf.to_vec();
self.inner_send(buf).await
}
async fn send_log_str(&self, kkind: VarKeyKind, s: &str) -> Result<(), Self::Error> {
let ctr = self.log_ctr.fetch_add(1, Ordering::Relaxed);
let key = match kkind {
VarKeyKind::Key1 => VarKey::Key1(LoggingTopic::TOPIC_KEY1),
VarKeyKind::Key2 => VarKey::Key2(LoggingTopic::TOPIC_KEY2),
VarKeyKind::Key4 => VarKey::Key4(LoggingTopic::TOPIC_KEY4),
VarKeyKind::Key8 => VarKey::Key8(LoggingTopic::TOPIC_KEY),
};
let wh = VarHeader {
key,
seq_no: VarSeq::Seq4(ctr),
};
let msg = s.to_string();
self.send::<<LoggingTopic as Topic>::Message>(wh, &msg)
.await
}
async fn send_log_fmt<'a>(
&self,
kkind: VarKeyKind,
a: Arguments<'a>,
) -> Result<(), Self::Error> {
let ctr = self.log_ctr.fetch_add(1, Ordering::Relaxed);
let key = match kkind {
VarKeyKind::Key1 => VarKey::Key1(LoggingTopic::TOPIC_KEY1),
VarKeyKind::Key2 => VarKey::Key2(LoggingTopic::TOPIC_KEY2),
VarKeyKind::Key4 => VarKey::Key4(LoggingTopic::TOPIC_KEY4),
VarKeyKind::Key8 => VarKey::Key8(LoggingTopic::TOPIC_KEY),
};
let wh = VarHeader {
key,
seq_no: VarSeq::Seq4(ctr),
};
let mut buf = wh.write_to_vec();
let msg = format!("{a}");
let msg = postcard::to_stdvec(&msg).unwrap();
buf.extend_from_slice(&msg);
self.inner_send(buf).await
}
}
#[derive(Debug)]
pub enum ChannelWireTxError {
ChannelClosed,
}
impl AsWireTxErrorKind for ChannelWireTxError {
fn as_kind(&self) -> WireTxErrorKind {
match self {
ChannelWireTxError::ChannelClosed => WireTxErrorKind::ConnectionClosed,
}
}
}
pub struct ChannelWireRx {
rx: mpsc::Receiver<Vec<u8>>,
stopper: Option<Stopper>,
}
impl ChannelWireRx {
pub fn new(rx: mpsc::Receiver<Vec<u8>>) -> Self {
Self { rx, stopper: None }
}
pub fn set_stopper(&mut self, stopper: Stopper) {
self.stopper = Some(stopper);
}
}
impl WireRx for ChannelWireRx {
type Error = ChannelWireRxError;
async fn receive<'a>(&mut self, buf: &'a mut [u8]) -> Result<&'a mut [u8], Self::Error> {
let ChannelWireRx { rx, stopper } = self;
let stop_fut = async {
if let Some(s) = stopper.as_ref() {
s.wait_stopped().await;
} else {
pending::<()>().await;
}
};
select! {
_ = stop_fut => {
Err(ChannelWireRxError::ChannelClosed)
}
msg = rx.recv() => {
let msg = msg.ok_or(ChannelWireRxError::ChannelClosed)?;
let out = buf
.get_mut(..msg.len())
.ok_or(ChannelWireRxError::MessageTooLarge)?;
out.copy_from_slice(&msg);
Ok(out)
}
}
}
}
#[derive(Debug)]
pub enum ChannelWireRxError {
ChannelClosed,
MessageTooLarge,
}
impl AsWireRxErrorKind for ChannelWireRxError {
fn as_kind(&self) -> WireRxErrorKind {
match self {
ChannelWireRxError::ChannelClosed => WireRxErrorKind::ConnectionClosed,
ChannelWireRxError::MessageTooLarge => WireRxErrorKind::ReceivedMessageTooLarge,
}
}
}
#[derive(Clone)]
pub struct ChannelWireSpawn;
impl WireSpawn for ChannelWireSpawn {
type Error = Infallible;
type Info = ();
fn info(&self) -> &Self::Info {
&()
}
}
pub fn tokio_spawn<Sp, F>(_sp: &Sp, fut: F) -> Result<(), Sp::Error>
where
Sp: WireSpawn<Error = Infallible, Info = ()>,
F: Future<Output = ()> + 'static + Send,
{
tokio::task::spawn(fut);
Ok(())
}