parity_tokio_ipc/
lib.rs

1//! Tokio IPC transport. Under the hood uses Unix Domain Sockets for Linux/Mac
2//! and Named Pipes for Windows.
3
4#![warn(missing_docs)]
5//#![deny(rust_2018_idioms)]
6
7// Use this directly once Rust 1.54 is stabilized; for some reason going
8// indirectly through a macro is okay.
9// See https://github.com/rust-lang/rust/issues/78835
10macro_rules! doc_comment {
11    ($x:expr) => {
12        #[doc = $x]
13        extern {}
14    };
15}
16
17doc_comment!(include_str!("../README.md"));
18
19#[cfg(windows)]
20mod win;
21#[cfg(not(windows))]
22mod unix;
23
24/// Endpoint for IPC transport
25///
26/// # Examples
27///
28/// ```ignore
29/// use parity_tokio_ipc::{Endpoint, dummy_endpoint};
30/// use futures::{future, Future, Stream, StreamExt};
31/// use tokio::runtime::Runtime;
32///
33/// fn main() {
34///		let mut runtime = Runtime::new().unwrap();
35///     let mut endpoint = Endpoint::new(dummy_endpoint());
36///     let server = endpoint.incoming()
37///         .expect("failed to open up a new pipe/socket")
38///         .for_each(|_stream| {
39///             println!("Connection received");
40///             futures::future::ready(())
41///         });
42///		runtime.block_on(server)
43/// }
44///```
45#[cfg(windows)]
46pub use win::{SecurityAttributes, Endpoint, Connection};
47#[cfg(unix)]
48pub use unix::{SecurityAttributes, Endpoint, Connection};
49
50/// For testing/examples
51pub fn dummy_endpoint() -> String {
52	let num: u64 = rand::Rng::gen(&mut rand::thread_rng());
53	if cfg!(windows) {
54		format!(r"\\.\pipe\my-pipe-{}", num)
55	} else {
56		format!(r"/tmp/my-uds-{}", num)
57	}
58}
59
60#[cfg(test)]
61mod tests {
62	use futures::{channel::oneshot, StreamExt as _, FutureExt as _};
63	use std::time::Duration;
64	use tokio::io::{split, AsyncReadExt, AsyncWriteExt};
65
66	use super::{dummy_endpoint, Endpoint, SecurityAttributes};
67	use std::path::Path;
68	use futures::future::{Either, select, ready};
69
70	async fn run_server(path: String) {
71		let path = path.to_owned();
72		let mut endpoint = Endpoint::new(path);
73
74		endpoint.set_security_attributes(
75			SecurityAttributes::empty()
76				.set_mode(0o777)
77				.unwrap()
78		);
79		let incoming = endpoint.incoming().expect("failed to open up a new socket");
80		futures::pin_mut!(incoming);
81
82		while let Some(result) = incoming.next().await {
83			match result {
84				Ok(stream) => {
85					let (mut reader, mut writer) = split(stream);
86					let mut buf = [0u8; 5];
87					reader.read_exact(&mut buf).await.expect("unable to read from socket");
88					writer.write_all(&buf[..]).await.expect("unable to write to socket");
89				}
90				_ => unreachable!("ideally")
91			}
92		};
93	}
94
95	#[tokio::test]
96	async fn smoke_test() {
97		let path = dummy_endpoint();
98		let (shutdown_tx, shutdown_rx) = oneshot::channel();
99
100		let server = select(Box::pin(run_server(path.clone())), shutdown_rx)
101			.then(|either| {
102				match either {
103					Either::Right((_, server)) => {
104						drop(server);
105					}
106					_ => unreachable!("also ideally")
107				};
108				ready(())
109			});
110		tokio::spawn(server);
111
112		tokio::time::sleep(Duration::from_secs(2)).await;
113
114		println!("Connecting to client 0...");
115		let mut client_0 = Endpoint::connect(&path).await
116			.expect("failed to open client_0");
117		tokio::time::sleep(Duration::from_secs(2)).await;
118		println!("Connecting to client 1...");
119		let mut client_1 = Endpoint::connect(&path).await
120			.expect("failed to open client_1");
121		let msg = b"hello";
122
123		let mut rx_buf = vec![0u8; msg.len()];
124		client_0.write_all(msg).await.expect("Unable to write message to client");
125		client_0.read_exact(&mut rx_buf).await.expect("Unable to read message from client");
126
127		let mut rx_buf2 = vec![0u8; msg.len()];
128		client_1.write_all(msg).await.expect("Unable to write message to client");
129		client_1.read_exact(&mut rx_buf2).await.expect("Unable to read message from client");
130
131		assert_eq!(rx_buf, msg);
132		assert_eq!(rx_buf2, msg);
133
134		// shutdown server
135		if let Ok(()) = shutdown_tx.send(()) {
136			// wait one second for the file to be deleted.
137			tokio::time::sleep(Duration::from_secs(1)).await;
138			let path = Path::new(&path);
139			// assert that it has
140			assert!(!path.exists());
141		}
142    }
143
144    #[tokio::test]
145    async fn incoming_stream_is_static() {
146        fn is_static<T: 'static>(_: T) {}
147
148        let path = dummy_endpoint();
149        let endpoint = Endpoint::new(path);
150        is_static(endpoint.incoming());
151    }
152
153	#[cfg(windows)]
154	fn create_pipe_with_permissions(attr: SecurityAttributes) -> ::std::io::Result<()> {
155		let path = dummy_endpoint();
156
157		let mut endpoint = Endpoint::new(path);
158		endpoint.set_security_attributes(attr);
159		endpoint.incoming().map(|_| ())
160	}
161
162	#[cfg(windows)]
163	#[tokio::test]
164	async fn test_pipe_permissions() {
165		create_pipe_with_permissions(SecurityAttributes::empty())
166			.expect("failed with no attributes");
167		create_pipe_with_permissions(SecurityAttributes::allow_everyone_create().unwrap())
168			.expect("failed with attributes for creating");
169		create_pipe_with_permissions(SecurityAttributes::empty().allow_everyone_connect().unwrap())
170			.expect("failed with attributes for connecting");
171	}
172}