1use std::sync::Arc;
2
3use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4use futures_util::lock::Mutex;
5
6use crate::{AsyncRead, AsyncWrite, IoResult};
7
8pub fn split<T: AsyncRead + AsyncWrite>(stream: T) -> (ReadHalf<T>, WriteHalf<T>) {
11 let stream = Arc::new(Mutex::new(stream));
12 (ReadHalf(stream.clone()), WriteHalf(stream))
13}
14
15#[derive(Debug)]
17pub struct ReadHalf<T>(Arc<Mutex<T>>);
18
19impl<T: Unpin> ReadHalf<T> {
20 #[track_caller]
29 pub fn unsplit(self, w: WriteHalf<T>) -> T {
30 if Arc::ptr_eq(&self.0, &w.0) {
31 drop(w);
32 let inner = Arc::try_unwrap(self.0).expect("`Arc::try_unwrap` failed");
33 inner.into_inner()
34 } else {
35 #[cold]
36 fn panic_unrelated() -> ! {
37 panic!("Unrelated `WriteHalf` passed to `ReadHalf::unsplit`.")
38 }
39
40 panic_unrelated()
41 }
42 }
43}
44
45impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
46 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
47 self.0.lock().await.read(buf).await
48 }
49
50 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
51 self.0.lock().await.read_vectored(buf).await
52 }
53}
54
55#[derive(Debug)]
57pub struct WriteHalf<T>(Arc<Mutex<T>>);
58
59impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
60 async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
61 self.0.lock().await.write(buf).await
62 }
63
64 async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
65 self.0.lock().await.write_vectored(buf).await
66 }
67
68 async fn flush(&mut self) -> IoResult<()> {
69 self.0.lock().await.flush().await
70 }
71
72 async fn shutdown(&mut self) -> IoResult<()> {
73 self.0.lock().await.shutdown().await
74 }
75}