1#![deny(rust_2018_idioms, nonstandard_style)]
4#![warn(future_incompatible)]
5#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
6#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
7#![cfg_attr(docsrs, feature(doc_auto_cfg))]
8
9#[cfg(feature = "openssl")]
10extern crate tls_openssl as openssl;
11
12use std::{net, thread, time::Duration};
13
14use actix_codec::{AsyncRead, AsyncWrite, Framed};
15use actix_rt::{net::TcpStream, System};
16use actix_server::{Server, ServerServiceFactory};
17use awc::{
18 error::PayloadError, http::header::HeaderMap, ws, Client, ClientRequest, ClientResponse,
19 Connector,
20};
21use bytes::Bytes;
22use futures_core::stream::Stream;
23use http::Method;
24use socket2::{Domain, Protocol, Socket, Type};
25use tokio::sync::mpsc;
26
27pub async fn test_server<F: ServerServiceFactory<TcpStream>>(factory: F) -> TestServer {
60 let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
61 test_server_with_addr(tcp, factory).await
62}
63
64pub async fn test_server_with_addr<F: ServerServiceFactory<TcpStream>>(
66 tcp: net::TcpListener,
67 factory: F,
68) -> TestServer {
69 let (started_tx, started_rx) = std::sync::mpsc::channel();
70 let (thread_stop_tx, thread_stop_rx) = mpsc::channel(1);
71
72 thread::spawn(move || {
74 System::new().block_on(async move {
75 let local_addr = tcp.local_addr().unwrap();
76
77 let srv = Server::build()
78 .workers(1)
79 .disable_signals()
80 .system_exit()
81 .listen("test", tcp, factory)
82 .expect("test server could not be created");
83
84 let srv = srv.run();
85 started_tx
86 .send((System::current(), srv.handle(), local_addr))
87 .unwrap();
88
89 srv.await.unwrap();
91 });
92
93 #[allow(clippy::let_underscore_future)]
96 let _ = thread_stop_tx.send(());
97 });
98
99 let (system, server, addr) = started_rx.recv().unwrap();
100
101 let client = {
102 #[cfg(feature = "openssl")]
103 let connector = {
104 use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
105
106 let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
107
108 builder.set_verify(SslVerifyMode::NONE);
109 let _ = builder
110 .set_alpn_protos(b"\x02h2\x08http/1.1")
111 .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
112
113 Connector::new()
114 .conn_lifetime(Duration::from_secs(0))
115 .timeout(Duration::from_millis(30000))
116 .openssl(builder.build())
117 };
118
119 #[cfg(not(feature = "openssl"))]
120 let connector = {
121 Connector::new()
122 .conn_lifetime(Duration::from_secs(0))
123 .timeout(Duration::from_millis(30000))
124 };
125
126 Client::builder().connector(connector).finish()
127 };
128
129 TestServer {
130 server,
131 client,
132 system,
133 addr,
134 thread_stop_rx,
135 }
136}
137
138pub struct TestServer {
140 server: actix_server::ServerHandle,
141 client: awc::Client,
142 system: actix_rt::System,
143 addr: net::SocketAddr,
144 thread_stop_rx: mpsc::Receiver<()>,
145}
146
147impl TestServer {
148 pub fn addr(&self) -> net::SocketAddr {
150 self.addr
151 }
152
153 pub fn url(&self, uri: &str) -> String {
155 if uri.starts_with('/') {
156 format!("http://localhost:{}{}", self.addr.port(), uri)
157 } else {
158 format!("http://localhost:{}/{}", self.addr.port(), uri)
159 }
160 }
161
162 pub fn surl(&self, uri: &str) -> String {
164 if uri.starts_with('/') {
165 format!("https://localhost:{}{}", self.addr.port(), uri)
166 } else {
167 format!("https://localhost:{}/{}", self.addr.port(), uri)
168 }
169 }
170
171 pub fn get<S: AsRef<str>>(&self, path: S) -> ClientRequest {
173 self.client.get(self.url(path.as_ref()).as_str())
174 }
175
176 pub fn sget<S: AsRef<str>>(&self, path: S) -> ClientRequest {
178 self.client.get(self.surl(path.as_ref()).as_str())
179 }
180
181 pub fn post<S: AsRef<str>>(&self, path: S) -> ClientRequest {
183 self.client.post(self.url(path.as_ref()).as_str())
184 }
185
186 pub fn spost<S: AsRef<str>>(&self, path: S) -> ClientRequest {
188 self.client.post(self.surl(path.as_ref()).as_str())
189 }
190
191 pub fn head<S: AsRef<str>>(&self, path: S) -> ClientRequest {
193 self.client.head(self.url(path.as_ref()).as_str())
194 }
195
196 pub fn shead<S: AsRef<str>>(&self, path: S) -> ClientRequest {
198 self.client.head(self.surl(path.as_ref()).as_str())
199 }
200
201 pub fn put<S: AsRef<str>>(&self, path: S) -> ClientRequest {
203 self.client.put(self.url(path.as_ref()).as_str())
204 }
205
206 pub fn sput<S: AsRef<str>>(&self, path: S) -> ClientRequest {
208 self.client.put(self.surl(path.as_ref()).as_str())
209 }
210
211 pub fn patch<S: AsRef<str>>(&self, path: S) -> ClientRequest {
213 self.client.patch(self.url(path.as_ref()).as_str())
214 }
215
216 pub fn spatch<S: AsRef<str>>(&self, path: S) -> ClientRequest {
218 self.client.patch(self.surl(path.as_ref()).as_str())
219 }
220
221 pub fn delete<S: AsRef<str>>(&self, path: S) -> ClientRequest {
223 self.client.delete(self.url(path.as_ref()).as_str())
224 }
225
226 pub fn sdelete<S: AsRef<str>>(&self, path: S) -> ClientRequest {
228 self.client.delete(self.surl(path.as_ref()).as_str())
229 }
230
231 pub fn options<S: AsRef<str>>(&self, path: S) -> ClientRequest {
233 self.client.options(self.url(path.as_ref()).as_str())
234 }
235
236 pub fn soptions<S: AsRef<str>>(&self, path: S) -> ClientRequest {
238 self.client.options(self.surl(path.as_ref()).as_str())
239 }
240
241 pub fn request<S: AsRef<str>>(&self, method: Method, path: S) -> ClientRequest {
243 self.client.request(method, path.as_ref())
244 }
245
246 pub async fn load_body<S>(
247 &mut self,
248 mut response: ClientResponse<S>,
249 ) -> Result<Bytes, PayloadError>
250 where
251 S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
252 {
253 response.body().limit(10_485_760).await
254 }
255
256 pub async fn ws_at(
258 &mut self,
259 path: &str,
260 ) -> Result<Framed<impl AsyncRead + AsyncWrite, ws::Codec>, awc::error::WsClientError> {
261 let url = self.url(path);
262 let connect = self.client.ws(url).connect();
263 connect.await.map(|(_, framed)| framed)
264 }
265
266 pub async fn ws(
268 &mut self,
269 ) -> Result<Framed<impl AsyncRead + AsyncWrite, ws::Codec>, awc::error::WsClientError> {
270 self.ws_at("/").await
271 }
272
273 pub fn client_headers(&mut self) -> Option<&mut HeaderMap> {
278 self.client.headers()
279 }
280
281 pub async fn stop(&mut self) {
285 self.server.stop(false).await;
287
288 self.system.stop();
291
292 let _ = self.thread_stop_rx.recv().await;
294 }
295}
296
297impl Drop for TestServer {
298 fn drop(&mut self) {
299 #[allow(clippy::let_underscore_future)]
304 let _ = self.server.stop(true);
305
306 self.system.stop();
308 }
309}
310
311pub fn unused_addr() -> net::SocketAddr {
313 let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
314 let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)).unwrap();
315 socket.bind(&addr.into()).unwrap();
316 socket.set_reuse_address(true).unwrap();
317 let tcp = net::TcpListener::from(socket);
318 tcp.local_addr().unwrap()
319}