1use std::{
2 borrow::Borrow,
3 io,
4 net::{Ipv4Addr, Ipv6Addr, SocketAddr},
5 ops::{Deref, DerefMut},
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures_util::stream::{self, Fuse, Stream, StreamExt};
11#[cfg(feature = "tokio")]
12use tokio::net::TcpStream;
13
14#[cfg(feature = "tokio")]
15use crate::ToProxyAddrs;
16use crate::{
17 io::{AsyncSocket, AsyncSocketExt},
18 Authentication,
19 Error,
20 IntoTargetAddr,
21 Result,
22 TargetAddr,
23};
24
25#[repr(u8)]
26#[derive(Clone, Copy)]
27enum Command {
28 Connect = 0x01,
29 Bind = 0x02,
30 #[allow(dead_code)]
31 Associate = 0x03,
32 #[cfg(feature = "tor")]
33 TorResolve = 0xF0,
34 #[cfg(feature = "tor")]
35 TorResolvePtr = 0xF1,
36}
37
38#[derive(Debug)]
42pub struct Socks5Stream<S> {
43 socket: S,
44 target: TargetAddr<'static>,
45}
46
47impl<S> Deref for Socks5Stream<S> {
48 type Target = S;
49
50 fn deref(&self) -> &Self::Target {
51 &self.socket
52 }
53}
54
55impl<S> DerefMut for Socks5Stream<S> {
56 fn deref_mut(&mut self) -> &mut Self::Target {
57 &mut self.socket
58 }
59}
60
61#[cfg(feature = "tokio")]
62impl Socks5Stream<TcpStream> {
63 pub async fn connect<'t, P, T>(proxy: P, target: T) -> Result<Socks5Stream<TcpStream>>
71 where
72 P: ToProxyAddrs,
73 T: IntoTargetAddr<'t>,
74 {
75 Self::execute_command(proxy, target, Authentication::None, Command::Connect).await
76 }
77
78 pub async fn connect_with_password<'a, 't, P, T>(
86 proxy: P,
87 target: T,
88 username: &'a str,
89 password: &'a str,
90 ) -> Result<Socks5Stream<TcpStream>>
91 where
92 P: ToProxyAddrs,
93 T: IntoTargetAddr<'t>,
94 {
95 Self::execute_command(
96 proxy,
97 target,
98 Authentication::Password { username, password },
99 Command::Connect,
100 )
101 .await
102 }
103
104 #[cfg(feature = "tor")]
105 pub async fn tor_resolve<'t, P, T>(proxy: P, target: T) -> Result<TargetAddr<'static>>
108 where
109 P: ToProxyAddrs,
110 T: IntoTargetAddr<'t>,
111 {
112 let sock = Self::execute_command(proxy, target, Authentication::None, Command::TorResolve).await?;
113
114 Ok(sock.target_addr().to_owned())
115 }
116
117 #[cfg(feature = "tor")]
118 pub async fn tor_resolve_ptr<'t, P, T>(proxy: P, target: T) -> Result<TargetAddr<'static>>
122 where
123 P: ToProxyAddrs,
124 T: IntoTargetAddr<'t>,
125 {
126 let sock = Self::execute_command(proxy, target, Authentication::None, Command::TorResolvePtr).await?;
127
128 Ok(sock.target_addr().to_owned())
129 }
130
131 async fn execute_command<'a, 't, P, T>(
132 proxy: P,
133 target: T,
134 auth: Authentication<'a>,
135 command: Command,
136 ) -> Result<Socks5Stream<TcpStream>>
137 where
138 P: ToProxyAddrs,
139 T: IntoTargetAddr<'t>,
140 {
141 Self::validate_auth(&auth)?;
142
143 let sock = SocksConnector::new(auth, command, proxy.to_proxy_addrs().fuse(), target.into_target_addr()?)
144 .execute()
145 .await?;
146
147 Ok(sock)
148 }
149}
150
151impl<S> Socks5Stream<S>
152where S: AsyncSocket + Unpin
153{
154 pub async fn connect_with_socket<'t, T>(socket: S, target: T) -> Result<Socks5Stream<S>>
161 where T: IntoTargetAddr<'t> {
162 Self::execute_command_with_socket(socket, target, Authentication::None, Command::Connect).await
163 }
164
165 pub async fn connect_with_password_and_socket<'a, 't, T>(
173 socket: S,
174 target: T,
175 username: &'a str,
176 password: &'a str,
177 ) -> Result<Socks5Stream<S>>
178 where
179 T: IntoTargetAddr<'t>,
180 {
181 Self::execute_command_with_socket(
182 socket,
183 target,
184 Authentication::Password { username, password },
185 Command::Connect,
186 )
187 .await
188 }
189
190 fn validate_auth(auth: &Authentication<'_>) -> Result<()> {
191 match auth {
192 Authentication::Password { username, password } => {
193 let username_len = username.as_bytes().len();
194 if !(1..=255).contains(&username_len) {
195 Err(Error::InvalidAuthValues("username length should between 1 to 255"))?
196 }
197 let password_len = password.as_bytes().len();
198 if !(1..=255).contains(&password_len) {
199 Err(Error::InvalidAuthValues("password length should between 1 to 255"))?
200 }
201 },
202 Authentication::None => {},
203 }
204 Ok(())
205 }
206
207 #[cfg(feature = "tor")]
208 pub async fn tor_resolve_with_socket<'t, T>(socket: S, target: T) -> Result<TargetAddr<'static>>
211 where T: IntoTargetAddr<'t> {
212 let sock = Self::execute_command_with_socket(socket, target, Authentication::None, Command::TorResolve).await?;
213
214 Ok(sock.target_addr().to_owned())
215 }
216
217 #[cfg(feature = "tor")]
218 pub async fn tor_resolve_ptr_with_socket<'t, T>(socket: S, target: T) -> Result<TargetAddr<'static>>
222 where T: IntoTargetAddr<'t> {
223 let sock =
224 Self::execute_command_with_socket(socket, target, Authentication::None, Command::TorResolvePtr).await?;
225
226 Ok(sock.target_addr().to_owned())
227 }
228
229 async fn execute_command_with_socket<'a, 't, T>(
230 socket: S,
231 target: T,
232 auth: Authentication<'a>,
233 command: Command,
234 ) -> Result<Socks5Stream<S>>
235 where
236 T: IntoTargetAddr<'t>,
237 {
238 Self::validate_auth(&auth)?;
239
240 let sock = SocksConnector::new(auth, command, stream::empty().fuse(), target.into_target_addr()?)
241 .execute_with_socket(socket)
242 .await?;
243
244 Ok(sock)
245 }
246
247 pub fn into_inner(self) -> S {
249 self.socket
250 }
251
252 pub fn target_addr(&self) -> TargetAddr<'_> {
254 match &self.target {
255 TargetAddr::Ip(addr) => TargetAddr::Ip(*addr),
256 TargetAddr::Domain(domain, port) => {
257 let domain: &str = domain.borrow();
258 TargetAddr::Domain(domain.into(), *port)
259 },
260 }
261 }
262}
263
264pub struct SocksConnector<'a, 't, S> {
266 auth: Authentication<'a>,
267 command: Command,
268 #[allow(dead_code)]
269 proxy: Fuse<S>,
270 target: TargetAddr<'t>,
271 buf: [u8; 513],
272 ptr: usize,
273 len: usize,
274}
275
276impl<'a, 't, S> SocksConnector<'a, 't, S>
277where S: Stream<Item = Result<SocketAddr>> + Unpin
278{
279 fn new(auth: Authentication<'a>, command: Command, proxy: Fuse<S>, target: TargetAddr<'t>) -> Self {
280 SocksConnector {
281 auth,
282 command,
283 proxy,
284 target,
285 buf: [0; 513],
286 ptr: 0,
287 len: 0,
288 }
289 }
290
291 #[cfg(feature = "tokio")]
292 pub async fn execute(&mut self) -> Result<Socks5Stream<TcpStream>> {
294 let next_addr = self.proxy.select_next_some().await?;
295 let tcp = TcpStream::connect(next_addr)
296 .await
297 .map_err(|_| Error::ProxyServerUnreachable)?;
298
299 self.execute_with_socket(tcp).await
300 }
301
302 pub async fn execute_with_socket<T: AsyncSocket + Unpin>(&mut self, mut socket: T) -> Result<Socks5Stream<T>> {
303 self.authenticate(&mut socket).await?;
304
305 self.prepare_send_request();
307 socket.write_all(&self.buf[self.ptr..self.len]).await?;
308
309 let target = self.receive_reply(&mut socket).await?;
310
311 Ok(Socks5Stream { socket, target })
312 }
313
314 fn prepare_send_method_selection(&mut self) {
315 self.ptr = 0;
316 self.buf[0] = 0x05;
317 match self.auth {
318 Authentication::None => {
319 self.buf[1..3].copy_from_slice(&[1, 0x00]);
320 self.len = 3;
321 },
322 Authentication::Password { .. } => {
323 self.buf[1..4].copy_from_slice(&[2, 0x00, 0x02]);
324 self.len = 4;
325 },
326 }
327 }
328
329 fn prepare_recv_method_selection(&mut self) {
330 self.ptr = 0;
331 self.len = 2;
332 }
333
334 fn prepare_send_password_auth(&mut self) {
335 if let Authentication::Password { username, password } = self.auth {
336 self.ptr = 0;
337 self.buf[0] = 0x01;
338 let username_bytes = username.as_bytes();
339 let username_len = username_bytes.len();
340 self.buf[1] = username_len as u8;
341 self.buf[2..(2 + username_len)].copy_from_slice(username_bytes);
342 let password_bytes = password.as_bytes();
343 let password_len = password_bytes.len();
344 self.len = 3 + username_len + password_len;
345 self.buf[2 + username_len] = password_len as u8;
346 self.buf[(3 + username_len)..self.len].copy_from_slice(password_bytes);
347 } else {
348 unreachable!()
349 }
350 }
351
352 fn prepare_recv_password_auth(&mut self) {
353 self.ptr = 0;
354 self.len = 2;
355 }
356
357 fn prepare_send_request(&mut self) {
358 self.ptr = 0;
359 self.buf[..3].copy_from_slice(&[0x05, self.command as u8, 0x00]);
360 match &self.target {
361 TargetAddr::Ip(SocketAddr::V4(addr)) => {
362 self.buf[3] = 0x01;
363 self.buf[4..8].copy_from_slice(&addr.ip().octets());
364 self.buf[8..10].copy_from_slice(&addr.port().to_be_bytes());
365 self.len = 10;
366 },
367 TargetAddr::Ip(SocketAddr::V6(addr)) => {
368 self.buf[3] = 0x04;
369 self.buf[4..20].copy_from_slice(&addr.ip().octets());
370 self.buf[20..22].copy_from_slice(&addr.port().to_be_bytes());
371 self.len = 22;
372 },
373 TargetAddr::Domain(domain, port) => {
374 self.buf[3] = 0x03;
375 let domain = domain.as_bytes();
376 let len = domain.len();
377 self.buf[4] = len as u8;
378 self.buf[5..5 + len].copy_from_slice(domain);
379 self.buf[(5 + len)..(7 + len)].copy_from_slice(&port.to_be_bytes());
380 self.len = 7 + len;
381 },
382 }
383 }
384
385 fn prepare_recv_reply(&mut self) {
386 self.ptr = 0;
387 self.len = 4;
388 }
389
390 async fn password_authentication_protocol<T: AsyncSocket + Unpin>(&mut self, tcp: &mut T) -> Result<()> {
391 if let Authentication::None = self.auth {
392 return Err(Error::AuthorizationRequired);
393 }
394
395 self.prepare_send_password_auth();
396 tcp.write_all(&self.buf[self.ptr..self.len]).await?;
397
398 self.prepare_recv_password_auth();
399 tcp.read_exact(&mut self.buf[self.ptr..self.len]).await?;
400
401 if self.buf[0] != 0x01 {
402 return Err(Error::InvalidResponseVersion);
403 }
404 if self.buf[1] != 0x00 {
405 return Err(Error::PasswordAuthFailure(self.buf[1]));
406 }
407
408 Ok(())
409 }
410
411 async fn authenticate<T: AsyncSocket + Unpin>(&mut self, tcp: &mut T) -> Result<()> {
412 self.prepare_send_method_selection();
414 tcp.write_all(&self.buf[self.ptr..self.len]).await?;
415
416 self.prepare_recv_method_selection();
418 tcp.read_exact(&mut self.buf[self.ptr..self.len]).await?;
419 if self.buf[0] != 0x05 {
420 return Err(Error::InvalidResponseVersion);
421 }
422 match self.buf[1] {
423 0x00 => {
424 },
426 0x02 => {
427 self.password_authentication_protocol(tcp).await?;
428 },
429 0xff => {
430 return Err(Error::NoAcceptableAuthMethods);
431 },
432 m if m != self.auth.id() => return Err(Error::UnknownAuthMethod),
433 _ => unimplemented!(),
434 }
435
436 Ok(())
437 }
438
439 async fn receive_reply<T: AsyncSocket + Unpin>(&mut self, tcp: &mut T) -> Result<TargetAddr<'static>> {
440 self.prepare_recv_reply();
441 self.ptr += tcp.read_exact(&mut self.buf[self.ptr..self.len]).await?;
442 if self.buf[0] != 0x05 {
443 return Err(Error::InvalidResponseVersion);
444 }
445 if self.buf[2] != 0x00 {
446 return Err(Error::InvalidReservedByte);
447 }
448
449 match self.buf[1] {
450 0x00 => {}, 0x01 => Err(Error::GeneralSocksServerFailure)?,
452 0x02 => Err(Error::ConnectionNotAllowedByRuleset)?,
453 0x03 => Err(Error::NetworkUnreachable)?,
454 0x04 => Err(Error::HostUnreachable)?,
455 0x05 => Err(Error::ConnectionRefused)?,
456 0x06 => Err(Error::TtlExpired)?,
457 0x07 => Err(Error::CommandNotSupported)?,
458 0x08 => Err(Error::AddressTypeNotSupported)?,
459 _ => Err(Error::UnknownAuthMethod)?,
460 }
461
462 match self.buf[3] {
463 0x01 => {
465 self.len = 10;
466 },
467 0x04 => {
469 self.len = 22;
470 },
471 0x03 => {
473 self.len = 5;
474 self.ptr += tcp.read_exact(&mut self.buf[self.ptr..self.len]).await?;
475 self.len += self.buf[4] as usize + 2;
476 },
477 _ => Err(Error::UnknownAddressType)?,
478 }
479
480 self.ptr += tcp.read_exact(&mut self.buf[self.ptr..self.len]).await?;
481 let target: TargetAddr<'static> = match self.buf[3] {
482 0x01 => {
484 let mut ip = [0; 4];
485 ip[..].copy_from_slice(&self.buf[4..8]);
486 let ip = Ipv4Addr::from(ip);
487 let port = u16::from_be_bytes([self.buf[8], self.buf[9]]);
488 (ip, port).into_target_addr()?
489 },
490 0x04 => {
492 let mut ip = [0; 16];
493 ip[..].copy_from_slice(&self.buf[4..20]);
494 let ip = Ipv6Addr::from(ip);
495 let port = u16::from_be_bytes([self.buf[20], self.buf[21]]);
496 (ip, port).into_target_addr()?
497 },
498 0x03 => {
500 let domain_bytes = self.buf[5..(self.len - 2)].to_vec();
501 let domain = String::from_utf8(domain_bytes)
502 .map_err(|_| Error::InvalidTargetAddress("not a valid UTF-8 string"))?;
503 let port = u16::from_be_bytes([self.buf[self.len - 2], self.buf[self.len - 1]]);
504 TargetAddr::Domain(domain.into(), port)
505 },
506 _ => unreachable!(),
507 };
508
509 Ok(target)
510 }
511}
512
513pub struct Socks5Listener<S> {
520 inner: Socks5Stream<S>,
521}
522
523#[cfg(feature = "tokio")]
524impl Socks5Listener<TcpStream> {
525 pub async fn bind<'t, P, T>(proxy: P, target: T) -> Result<Socks5Listener<TcpStream>>
535 where
536 P: ToProxyAddrs,
537 T: IntoTargetAddr<'t>,
538 {
539 Self::bind_with_auth(Authentication::None, proxy, target).await
540 }
541
542 pub async fn bind_with_password<'a, 't, P, T>(
553 proxy: P,
554 target: T,
555 username: &'a str,
556 password: &'a str,
557 ) -> Result<Socks5Listener<TcpStream>>
558 where
559 P: ToProxyAddrs,
560 T: IntoTargetAddr<'t>,
561 {
562 Self::bind_with_auth(Authentication::Password { username, password }, proxy, target).await
563 }
564
565 async fn bind_with_auth<'t, P, T>(
566 auth: Authentication<'_>,
567 proxy: P,
568 target: T,
569 ) -> Result<Socks5Listener<TcpStream>>
570 where
571 P: ToProxyAddrs,
572 T: IntoTargetAddr<'t>,
573 {
574 let socket = SocksConnector::new(
575 auth,
576 Command::Bind,
577 proxy.to_proxy_addrs().fuse(),
578 target.into_target_addr()?,
579 )
580 .execute()
581 .await?;
582
583 Ok(Socks5Listener { inner: socket })
584 }
585}
586
587impl<S> Socks5Listener<S>
588where S: AsyncSocket + Unpin
589{
590 pub async fn bind_with_socket<'t, T>(socket: S, target: T) -> Result<Socks5Listener<S>>
601 where T: IntoTargetAddr<'t> {
602 Self::bind_with_auth_and_socket(Authentication::None, socket, target).await
603 }
604
605 pub async fn bind_with_password_and_socket<'a, 't, T>(
616 socket: S,
617 target: T,
618 username: &'a str,
619 password: &'a str,
620 ) -> Result<Socks5Listener<S>>
621 where
622 T: IntoTargetAddr<'t>,
623 {
624 Self::bind_with_auth_and_socket(Authentication::Password { username, password }, socket, target).await
625 }
626
627 async fn bind_with_auth_and_socket<'t, T>(
628 auth: Authentication<'_>,
629 socket: S,
630 target: T,
631 ) -> Result<Socks5Listener<S>>
632 where
633 T: IntoTargetAddr<'t>,
634 {
635 let socket = SocksConnector::new(auth, Command::Bind, stream::empty().fuse(), target.into_target_addr()?)
636 .execute_with_socket(socket)
637 .await?;
638
639 Ok(Socks5Listener { inner: socket })
640 }
641
642 pub fn bind_addr(&self) -> TargetAddr {
647 self.inner.target_addr()
648 }
649
650 pub async fn accept(mut self) -> Result<Socks5Stream<S>> {
656 let mut connector = SocksConnector {
657 auth: Authentication::None,
658 command: Command::Bind,
659 proxy: stream::empty().fuse(),
660 target: self.inner.target,
661 buf: [0; 513],
662 ptr: 0,
663 len: 0,
664 };
665
666 let target = connector.receive_reply(&mut self.inner.socket).await?;
667
668 Ok(Socks5Stream {
669 socket: self.inner.socket,
670 target,
671 })
672 }
673}
674
675#[cfg(feature = "tokio")]
676impl<T> tokio::io::AsyncRead for Socks5Stream<T>
677where T: tokio::io::AsyncRead + Unpin
678{
679 fn poll_read(
680 mut self: Pin<&mut Self>,
681 cx: &mut Context<'_>,
682 buf: &mut tokio::io::ReadBuf<'_>,
683 ) -> Poll<io::Result<()>> {
684 tokio::io::AsyncRead::poll_read(Pin::new(&mut self.socket), cx, buf)
685 }
686}
687
688#[cfg(feature = "tokio")]
689impl<T> tokio::io::AsyncWrite for Socks5Stream<T>
690where T: tokio::io::AsyncWrite + Unpin
691{
692 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
693 tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.socket), cx, buf)
694 }
695
696 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
697 tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.socket), cx)
698 }
699
700 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
701 tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.socket), cx)
702 }
703}
704
705#[cfg(feature = "futures-io")]
706impl<T> futures_io::AsyncRead for Socks5Stream<T>
707where T: futures_io::AsyncRead + Unpin
708{
709 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
710 futures_io::AsyncRead::poll_read(Pin::new(&mut self.socket), cx, buf)
711 }
712}
713
714#[cfg(feature = "futures-io")]
715impl<T> futures_io::AsyncWrite for Socks5Stream<T>
716where T: futures_io::AsyncWrite + Unpin
717{
718 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
719 futures_io::AsyncWrite::poll_write(Pin::new(&mut self.socket), cx, buf)
720 }
721
722 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
723 futures_io::AsyncWrite::poll_flush(Pin::new(&mut self.socket), cx)
724 }
725
726 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
727 futures_io::AsyncWrite::poll_close(Pin::new(&mut self.socket), cx)
728 }
729}