1pub mod upgrade;
4
5use futures_util::ready;
6use hyper::service::HttpService;
7use std::future::Future;
8use std::marker::PhantomPinned;
9use std::mem::MaybeUninit;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::{error::Error as StdError, io, time::Duration};
13
14use bytes::Bytes;
15use http::{Request, Response};
16use http_body::Body;
17use hyper::{
18 body::Incoming,
19 rt::{Read, ReadBuf, Timer, Write},
20 service::Service,
21};
22
23#[cfg(feature = "http1")]
24use hyper::server::conn::http1;
25
26#[cfg(feature = "http2")]
27use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
28
29#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
30use std::marker::PhantomData;
31
32use pin_project_lite::pin_project;
33
34use crate::common::rewind::Rewind;
35
36type Error = Box<dyn std::error::Error + Send + Sync>;
37
38type Result<T> = std::result::Result<T, Error>;
39
40const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
41
42#[cfg(feature = "http2")]
44pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
45
46#[cfg(feature = "http2")]
47impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
48
49#[cfg(not(feature = "http2"))]
51pub trait HttpServerConnExec<A, B: Body> {}
52
53#[cfg(not(feature = "http2"))]
54impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
55
56#[derive(Clone, Debug)]
58pub struct Builder<E> {
59 #[cfg(feature = "http1")]
60 http1: http1::Builder,
61 #[cfg(feature = "http2")]
62 http2: http2::Builder<E>,
63 #[cfg(any(feature = "http1", feature = "http2"))]
64 version: Option<Version>,
65 #[cfg(not(feature = "http2"))]
66 _executor: E,
67}
68
69impl<E> Builder<E> {
70 pub fn new(executor: E) -> Self {
86 Self {
87 #[cfg(feature = "http1")]
88 http1: http1::Builder::new(),
89 #[cfg(feature = "http2")]
90 http2: http2::Builder::new(executor),
91 #[cfg(any(feature = "http1", feature = "http2"))]
92 version: None,
93 #[cfg(not(feature = "http2"))]
94 _executor: executor,
95 }
96 }
97
98 #[cfg(feature = "http1")]
100 pub fn http1(&mut self) -> Http1Builder<'_, E> {
101 Http1Builder { inner: self }
102 }
103
104 #[cfg(feature = "http2")]
106 pub fn http2(&mut self) -> Http2Builder<'_, E> {
107 Http2Builder { inner: self }
108 }
109
110 #[cfg(feature = "http2")]
116 pub fn http2_only(mut self) -> Self {
117 assert!(self.version.is_none());
118 self.version = Some(Version::H2);
119 self
120 }
121
122 #[cfg(feature = "http1")]
128 pub fn http1_only(mut self) -> Self {
129 assert!(self.version.is_none());
130 self.version = Some(Version::H1);
131 self
132 }
133
134 pub fn is_http1_available(&self) -> bool {
136 match self.version {
137 #[cfg(feature = "http1")]
138 Some(Version::H1) => true,
139 #[cfg(feature = "http2")]
140 Some(Version::H2) => false,
141 #[cfg(any(feature = "http1", feature = "http2"))]
142 _ => true,
143 }
144 }
145
146 pub fn is_http2_available(&self) -> bool {
148 match self.version {
149 #[cfg(feature = "http1")]
150 Some(Version::H1) => false,
151 #[cfg(feature = "http2")]
152 Some(Version::H2) => true,
153 #[cfg(any(feature = "http1", feature = "http2"))]
154 _ => true,
155 }
156 }
157
158 pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
160 where
161 S: Service<Request<Incoming>, Response = Response<B>>,
162 S::Future: 'static,
163 S::Error: Into<Box<dyn StdError + Send + Sync>>,
164 B: Body + 'static,
165 B::Error: Into<Box<dyn StdError + Send + Sync>>,
166 I: Read + Write + Unpin + 'static,
167 E: HttpServerConnExec<S::Future, B>,
168 {
169 let state = match self.version {
170 #[cfg(feature = "http1")]
171 Some(Version::H1) => {
172 let io = Rewind::new_buffered(io, Bytes::new());
173 let conn = self.http1.serve_connection(io, service);
174 ConnState::H1 { conn }
175 }
176 #[cfg(feature = "http2")]
177 Some(Version::H2) => {
178 let io = Rewind::new_buffered(io, Bytes::new());
179 let conn = self.http2.serve_connection(io, service);
180 ConnState::H2 { conn }
181 }
182 #[cfg(any(feature = "http1", feature = "http2"))]
183 _ => ConnState::ReadVersion {
184 read_version: read_version(io),
185 builder: Cow::Borrowed(self),
186 service: Some(service),
187 },
188 };
189
190 Connection { state }
191 }
192
193 pub fn serve_connection_with_upgrades<I, S, B>(
203 &self,
204 io: I,
205 service: S,
206 ) -> UpgradeableConnection<'_, I, S, E>
207 where
208 S: Service<Request<Incoming>, Response = Response<B>>,
209 S::Future: 'static,
210 S::Error: Into<Box<dyn StdError + Send + Sync>>,
211 B: Body + 'static,
212 B::Error: Into<Box<dyn StdError + Send + Sync>>,
213 I: Read + Write + Unpin + Send + 'static,
214 E: HttpServerConnExec<S::Future, B>,
215 {
216 UpgradeableConnection {
217 state: UpgradeableConnState::ReadVersion {
218 read_version: read_version(io),
219 builder: Cow::Borrowed(self),
220 service: Some(service),
221 },
222 }
223 }
224}
225
226#[derive(Copy, Clone, Debug)]
227enum Version {
228 H1,
229 H2,
230}
231
232impl Version {
233 #[must_use]
234 #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
235 pub fn unsupported(self) -> Error {
236 match self {
237 Version::H1 => Error::from("HTTP/1 is not supported"),
238 Version::H2 => Error::from("HTTP/2 is not supported"),
239 }
240 }
241}
242
243fn read_version<I>(io: I) -> ReadVersion<I>
244where
245 I: Read + Unpin,
246{
247 ReadVersion {
248 io: Some(io),
249 buf: [MaybeUninit::uninit(); 24],
250 filled: 0,
251 version: Version::H2,
252 cancelled: false,
253 _pin: PhantomPinned,
254 }
255}
256
257pin_project! {
258 struct ReadVersion<I> {
259 io: Option<I>,
260 buf: [MaybeUninit<u8>; 24],
261 filled: usize,
263 version: Version,
264 cancelled: bool,
265 #[pin]
267 _pin: PhantomPinned,
268 }
269}
270
271impl<I> ReadVersion<I> {
272 pub fn cancel(self: Pin<&mut Self>) {
273 *self.project().cancelled = true;
274 }
275}
276
277impl<I> Future for ReadVersion<I>
278where
279 I: Read + Unpin,
280{
281 type Output = io::Result<(Version, Rewind<I>)>;
282
283 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
284 let this = self.project();
285 if *this.cancelled {
286 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
287 }
288
289 let mut buf = ReadBuf::uninit(&mut *this.buf);
290 unsafe {
293 buf.unfilled().advance(*this.filled);
294 };
295
296 while buf.filled().len() < H2_PREFACE.len() {
298 let len = buf.filled().len();
299 ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
300 *this.filled = buf.filled().len();
301
302 if buf.filled().len() == len
304 || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
305 {
306 *this.version = Version::H1;
307 break;
308 }
309 }
310
311 let io = this.io.take().unwrap();
312 let buf = buf.filled().to_vec();
313 Poll::Ready(Ok((
314 *this.version,
315 Rewind::new_buffered(io, Bytes::from(buf)),
316 )))
317 }
318}
319
320pin_project! {
321 pub struct Connection<'a, I, S, E>
323 where
324 S: HttpService<Incoming>,
325 {
326 #[pin]
327 state: ConnState<'a, I, S, E>,
328 }
329}
330
331enum Cow<'a, T> {
333 Borrowed(&'a T),
334 Owned(T),
335}
336
337impl<T> std::ops::Deref for Cow<'_, T> {
338 type Target = T;
339 fn deref(&self) -> &T {
340 match self {
341 Cow::Borrowed(t) => &*t,
342 Cow::Owned(ref t) => t,
343 }
344 }
345}
346
347#[cfg(feature = "http1")]
348type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
349
350#[cfg(not(feature = "http1"))]
351type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
352
353#[cfg(feature = "http2")]
354type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
355
356#[cfg(not(feature = "http2"))]
357type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
358
359pin_project! {
360 #[project = ConnStateProj]
361 enum ConnState<'a, I, S, E>
362 where
363 S: HttpService<Incoming>,
364 {
365 ReadVersion {
366 #[pin]
367 read_version: ReadVersion<I>,
368 builder: Cow<'a, Builder<E>>,
369 service: Option<S>,
370 },
371 H1 {
372 #[pin]
373 conn: Http1Connection<I, S>,
374 },
375 H2 {
376 #[pin]
377 conn: Http2Connection<I, S, E>,
378 },
379 }
380}
381
382impl<I, S, E, B> Connection<'_, I, S, E>
383where
384 S: HttpService<Incoming, ResBody = B>,
385 S::Error: Into<Box<dyn StdError + Send + Sync>>,
386 I: Read + Write + Unpin,
387 B: Body + 'static,
388 B::Error: Into<Box<dyn StdError + Send + Sync>>,
389 E: HttpServerConnExec<S::Future, B>,
390{
391 pub fn graceful_shutdown(self: Pin<&mut Self>) {
400 match self.project().state.project() {
401 ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
402 #[cfg(feature = "http1")]
403 ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
404 #[cfg(feature = "http2")]
405 ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
406 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
407 _ => unreachable!(),
408 }
409 }
410
411 pub fn into_owned(self) -> Connection<'static, I, S, E>
413 where
414 Builder<E>: Clone,
415 {
416 Connection {
417 state: match self.state {
418 ConnState::ReadVersion {
419 read_version,
420 builder,
421 service,
422 } => ConnState::ReadVersion {
423 read_version,
424 service,
425 builder: Cow::Owned(builder.clone()),
426 },
427 #[cfg(feature = "http1")]
428 ConnState::H1 { conn } => ConnState::H1 { conn },
429 #[cfg(feature = "http2")]
430 ConnState::H2 { conn } => ConnState::H2 { conn },
431 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
432 _ => unreachable!(),
433 },
434 }
435 }
436}
437
438impl<I, S, E, B> Future for Connection<'_, I, S, E>
439where
440 S: Service<Request<Incoming>, Response = Response<B>>,
441 S::Future: 'static,
442 S::Error: Into<Box<dyn StdError + Send + Sync>>,
443 B: Body + 'static,
444 B::Error: Into<Box<dyn StdError + Send + Sync>>,
445 I: Read + Write + Unpin + 'static,
446 E: HttpServerConnExec<S::Future, B>,
447{
448 type Output = Result<()>;
449
450 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451 loop {
452 let mut this = self.as_mut().project();
453
454 match this.state.as_mut().project() {
455 ConnStateProj::ReadVersion {
456 read_version,
457 builder,
458 service,
459 } => {
460 let (version, io) = ready!(read_version.poll(cx))?;
461 let service = service.take().unwrap();
462 match version {
463 #[cfg(feature = "http1")]
464 Version::H1 => {
465 let conn = builder.http1.serve_connection(io, service);
466 this.state.set(ConnState::H1 { conn });
467 }
468 #[cfg(feature = "http2")]
469 Version::H2 => {
470 let conn = builder.http2.serve_connection(io, service);
471 this.state.set(ConnState::H2 { conn });
472 }
473 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
474 _ => return Poll::Ready(Err(version.unsupported())),
475 }
476 }
477 #[cfg(feature = "http1")]
478 ConnStateProj::H1 { conn } => {
479 return conn.poll(cx).map_err(Into::into);
480 }
481 #[cfg(feature = "http2")]
482 ConnStateProj::H2 { conn } => {
483 return conn.poll(cx).map_err(Into::into);
484 }
485 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
486 _ => unreachable!(),
487 }
488 }
489 }
490}
491
492pin_project! {
493 pub struct UpgradeableConnection<'a, I, S, E>
495 where
496 S: HttpService<Incoming>,
497 {
498 #[pin]
499 state: UpgradeableConnState<'a, I, S, E>,
500 }
501}
502
503#[cfg(feature = "http1")]
504type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
505
506#[cfg(not(feature = "http1"))]
507type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
508
509pin_project! {
510 #[project = UpgradeableConnStateProj]
511 enum UpgradeableConnState<'a, I, S, E>
512 where
513 S: HttpService<Incoming>,
514 {
515 ReadVersion {
516 #[pin]
517 read_version: ReadVersion<I>,
518 builder: Cow<'a, Builder<E>>,
519 service: Option<S>,
520 },
521 H1 {
522 #[pin]
523 conn: Http1UpgradeableConnection<Rewind<I>, S>,
524 },
525 H2 {
526 #[pin]
527 conn: Http2Connection<I, S, E>,
528 },
529 }
530}
531
532impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
533where
534 S: HttpService<Incoming, ResBody = B>,
535 S::Error: Into<Box<dyn StdError + Send + Sync>>,
536 I: Read + Write + Unpin,
537 B: Body + 'static,
538 B::Error: Into<Box<dyn StdError + Send + Sync>>,
539 E: HttpServerConnExec<S::Future, B>,
540{
541 pub fn graceful_shutdown(self: Pin<&mut Self>) {
550 match self.project().state.project() {
551 UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
552 #[cfg(feature = "http1")]
553 UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
554 #[cfg(feature = "http2")]
555 UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
556 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
557 _ => unreachable!(),
558 }
559 }
560
561 pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
563 where
564 Builder<E>: Clone,
565 {
566 UpgradeableConnection {
567 state: match self.state {
568 UpgradeableConnState::ReadVersion {
569 read_version,
570 builder,
571 service,
572 } => UpgradeableConnState::ReadVersion {
573 read_version,
574 service,
575 builder: Cow::Owned(builder.clone()),
576 },
577 #[cfg(feature = "http1")]
578 UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
579 #[cfg(feature = "http2")]
580 UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
581 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
582 _ => unreachable!(),
583 },
584 }
585 }
586}
587
588impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
589where
590 S: Service<Request<Incoming>, Response = Response<B>>,
591 S::Future: 'static,
592 S::Error: Into<Box<dyn StdError + Send + Sync>>,
593 B: Body + 'static,
594 B::Error: Into<Box<dyn StdError + Send + Sync>>,
595 I: Read + Write + Unpin + Send + 'static,
596 E: HttpServerConnExec<S::Future, B>,
597{
598 type Output = Result<()>;
599
600 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
601 loop {
602 let mut this = self.as_mut().project();
603
604 match this.state.as_mut().project() {
605 UpgradeableConnStateProj::ReadVersion {
606 read_version,
607 builder,
608 service,
609 } => {
610 let (version, io) = ready!(read_version.poll(cx))?;
611 let service = service.take().unwrap();
612 match version {
613 #[cfg(feature = "http1")]
614 Version::H1 => {
615 let conn = builder.http1.serve_connection(io, service).with_upgrades();
616 this.state.set(UpgradeableConnState::H1 { conn });
617 }
618 #[cfg(feature = "http2")]
619 Version::H2 => {
620 let conn = builder.http2.serve_connection(io, service);
621 this.state.set(UpgradeableConnState::H2 { conn });
622 }
623 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
624 _ => return Poll::Ready(Err(version.unsupported())),
625 }
626 }
627 #[cfg(feature = "http1")]
628 UpgradeableConnStateProj::H1 { conn } => {
629 return conn.poll(cx).map_err(Into::into);
630 }
631 #[cfg(feature = "http2")]
632 UpgradeableConnStateProj::H2 { conn } => {
633 return conn.poll(cx).map_err(Into::into);
634 }
635 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
636 _ => unreachable!(),
637 }
638 }
639 }
640}
641
642#[cfg(feature = "http1")]
644pub struct Http1Builder<'a, E> {
645 inner: &'a mut Builder<E>,
646}
647
648#[cfg(feature = "http1")]
649impl<E> Http1Builder<'_, E> {
650 #[cfg(feature = "http2")]
652 pub fn http2(&mut self) -> Http2Builder<'_, E> {
653 Http2Builder { inner: self.inner }
654 }
655
656 pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
662 self.inner.http1.auto_date_header(enabled);
663 self
664 }
665
666 pub fn half_close(&mut self, val: bool) -> &mut Self {
675 self.inner.http1.half_close(val);
676 self
677 }
678
679 pub fn keep_alive(&mut self, val: bool) -> &mut Self {
683 self.inner.http1.keep_alive(val);
684 self
685 }
686
687 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
694 self.inner.http1.title_case_headers(enabled);
695 self
696 }
697
698 pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
706 self.inner.http1.ignore_invalid_headers(enabled);
707 self
708 }
709
710 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
724 self.inner.http1.preserve_header_case(enabled);
725 self
726 }
727
728 pub fn max_headers(&mut self, val: usize) -> &mut Self {
744 self.inner.http1.max_headers(val);
745 self
746 }
747
748 pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
758 self.inner.http1.header_read_timeout(read_timeout);
759 self
760 }
761
762 pub fn writev(&mut self, val: bool) -> &mut Self {
775 self.inner.http1.writev(val);
776 self
777 }
778
779 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
787 self.inner.http1.max_buf_size(max);
788 self
789 }
790
791 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
797 self.inner.http1.pipeline_flush(enabled);
798 self
799 }
800
801 pub fn timer<M>(&mut self, timer: M) -> &mut Self
803 where
804 M: Timer + Send + Sync + 'static,
805 {
806 self.inner.http1.timer(timer);
807 self
808 }
809
810 #[cfg(feature = "http2")]
812 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
813 where
814 S: Service<Request<Incoming>, Response = Response<B>>,
815 S::Future: 'static,
816 S::Error: Into<Box<dyn StdError + Send + Sync>>,
817 B: Body + 'static,
818 B::Error: Into<Box<dyn StdError + Send + Sync>>,
819 I: Read + Write + Unpin + 'static,
820 E: HttpServerConnExec<S::Future, B>,
821 {
822 self.inner.serve_connection(io, service).await
823 }
824
825 #[cfg(not(feature = "http2"))]
827 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
828 where
829 S: Service<Request<Incoming>, Response = Response<B>>,
830 S::Future: 'static,
831 S::Error: Into<Box<dyn StdError + Send + Sync>>,
832 B: Body + 'static,
833 B::Error: Into<Box<dyn StdError + Send + Sync>>,
834 I: Read + Write + Unpin + 'static,
835 {
836 self.inner.serve_connection(io, service).await
837 }
838
839 #[cfg(feature = "http2")]
843 pub fn serve_connection_with_upgrades<I, S, B>(
844 &self,
845 io: I,
846 service: S,
847 ) -> UpgradeableConnection<'_, I, S, E>
848 where
849 S: Service<Request<Incoming>, Response = Response<B>>,
850 S::Future: 'static,
851 S::Error: Into<Box<dyn StdError + Send + Sync>>,
852 B: Body + 'static,
853 B::Error: Into<Box<dyn StdError + Send + Sync>>,
854 I: Read + Write + Unpin + Send + 'static,
855 E: HttpServerConnExec<S::Future, B>,
856 {
857 self.inner.serve_connection_with_upgrades(io, service)
858 }
859}
860
861#[cfg(feature = "http2")]
863pub struct Http2Builder<'a, E> {
864 inner: &'a mut Builder<E>,
865}
866
867#[cfg(feature = "http2")]
868impl<E> Http2Builder<'_, E> {
869 #[cfg(feature = "http1")]
870 pub fn http1(&mut self) -> Http1Builder<'_, E> {
872 Http1Builder { inner: self.inner }
873 }
874
875 pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
882 self.inner.http2.max_pending_accept_reset_streams(max);
883 self
884 }
885
886 pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
895 self.inner.http2.max_local_error_reset_streams(max);
896 self
897 }
898
899 pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
908 self.inner.http2.initial_stream_window_size(sz);
909 self
910 }
911
912 pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
918 self.inner.http2.initial_connection_window_size(sz);
919 self
920 }
921
922 pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
928 self.inner.http2.adaptive_window(enabled);
929 self
930 }
931
932 pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
938 self.inner.http2.max_frame_size(sz);
939 self
940 }
941
942 pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
949 self.inner.http2.max_concurrent_streams(max);
950 self
951 }
952
953 pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
963 self.inner.http2.keep_alive_interval(interval);
964 self
965 }
966
967 pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
977 self.inner.http2.keep_alive_timeout(timeout);
978 self
979 }
980
981 pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
989 self.inner.http2.max_send_buf_size(max);
990 self
991 }
992
993 pub fn enable_connect_protocol(&mut self) -> &mut Self {
997 self.inner.http2.enable_connect_protocol();
998 self
999 }
1000
1001 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
1005 self.inner.http2.max_header_list_size(max);
1006 self
1007 }
1008
1009 pub fn timer<M>(&mut self, timer: M) -> &mut Self
1011 where
1012 M: Timer + Send + Sync + 'static,
1013 {
1014 self.inner.http2.timer(timer);
1015 self
1016 }
1017
1018 pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
1024 self.inner.http2.auto_date_header(enabled);
1025 self
1026 }
1027
1028 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
1030 where
1031 S: Service<Request<Incoming>, Response = Response<B>>,
1032 S::Future: 'static,
1033 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1034 B: Body + 'static,
1035 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1036 I: Read + Write + Unpin + 'static,
1037 E: HttpServerConnExec<S::Future, B>,
1038 {
1039 self.inner.serve_connection(io, service).await
1040 }
1041
1042 pub fn serve_connection_with_upgrades<I, S, B>(
1046 &self,
1047 io: I,
1048 service: S,
1049 ) -> UpgradeableConnection<'_, I, S, E>
1050 where
1051 S: Service<Request<Incoming>, Response = Response<B>>,
1052 S::Future: 'static,
1053 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1054 B: Body + 'static,
1055 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1056 I: Read + Write + Unpin + Send + 'static,
1057 E: HttpServerConnExec<S::Future, B>,
1058 {
1059 self.inner.serve_connection_with_upgrades(io, service)
1060 }
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065 use crate::{
1066 rt::{TokioExecutor, TokioIo},
1067 server::conn::auto,
1068 };
1069 use http::{Request, Response};
1070 use http_body::Body;
1071 use http_body_util::{BodyExt, Empty, Full};
1072 use hyper::{body, body::Bytes, client, service::service_fn};
1073 use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
1074 use tokio::{
1075 net::{TcpListener, TcpStream},
1076 pin,
1077 };
1078
1079 const BODY: &[u8] = b"Hello, world!";
1080
1081 #[test]
1082 fn configuration() {
1083 auto::Builder::new(TokioExecutor::new())
1085 .http1()
1086 .keep_alive(true)
1087 .http2()
1088 .keep_alive_interval(None);
1089 let mut builder = auto::Builder::new(TokioExecutor::new());
1093
1094 builder.http1().keep_alive(true);
1095 builder.http2().keep_alive_interval(None);
1096 }
1098
1099 #[cfg(not(miri))]
1100 #[tokio::test]
1101 async fn http1() {
1102 let addr = start_server(false, false).await;
1103 let mut sender = connect_h1(addr).await;
1104
1105 let response = sender
1106 .send_request(Request::new(Empty::<Bytes>::new()))
1107 .await
1108 .unwrap();
1109
1110 let body = response.into_body().collect().await.unwrap().to_bytes();
1111
1112 assert_eq!(body, BODY);
1113 }
1114
1115 #[cfg(not(miri))]
1116 #[tokio::test]
1117 async fn http2() {
1118 let addr = start_server(false, false).await;
1119 let mut sender = connect_h2(addr).await;
1120
1121 let response = sender
1122 .send_request(Request::new(Empty::<Bytes>::new()))
1123 .await
1124 .unwrap();
1125
1126 let body = response.into_body().collect().await.unwrap().to_bytes();
1127
1128 assert_eq!(body, BODY);
1129 }
1130
1131 #[cfg(not(miri))]
1132 #[tokio::test]
1133 async fn http2_only() {
1134 let addr = start_server(false, true).await;
1135 let mut sender = connect_h2(addr).await;
1136
1137 let response = sender
1138 .send_request(Request::new(Empty::<Bytes>::new()))
1139 .await
1140 .unwrap();
1141
1142 let body = response.into_body().collect().await.unwrap().to_bytes();
1143
1144 assert_eq!(body, BODY);
1145 }
1146
1147 #[cfg(not(miri))]
1148 #[tokio::test]
1149 async fn http2_only_fail_if_client_is_http1() {
1150 let addr = start_server(false, true).await;
1151 let mut sender = connect_h1(addr).await;
1152
1153 let _ = sender
1154 .send_request(Request::new(Empty::<Bytes>::new()))
1155 .await
1156 .expect_err("should fail");
1157 }
1158
1159 #[cfg(not(miri))]
1160 #[tokio::test]
1161 async fn http1_only() {
1162 let addr = start_server(true, false).await;
1163 let mut sender = connect_h1(addr).await;
1164
1165 let response = sender
1166 .send_request(Request::new(Empty::<Bytes>::new()))
1167 .await
1168 .unwrap();
1169
1170 let body = response.into_body().collect().await.unwrap().to_bytes();
1171
1172 assert_eq!(body, BODY);
1173 }
1174
1175 #[cfg(not(miri))]
1176 #[tokio::test]
1177 async fn http1_only_fail_if_client_is_http2() {
1178 let addr = start_server(true, false).await;
1179 let mut sender = connect_h2(addr).await;
1180
1181 let _ = sender
1182 .send_request(Request::new(Empty::<Bytes>::new()))
1183 .await
1184 .expect_err("should fail");
1185 }
1186
1187 #[cfg(not(miri))]
1188 #[tokio::test]
1189 async fn graceful_shutdown() {
1190 let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1191 .await
1192 .unwrap();
1193
1194 let listener_addr = listener.local_addr().unwrap();
1195
1196 let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1198 let _stream = TcpStream::connect(listener_addr).await.unwrap();
1200
1201 let (stream, _) = listen_task.await.unwrap();
1202 let stream = TokioIo::new(stream);
1203 let builder = auto::Builder::new(TokioExecutor::new());
1204 let connection = builder.serve_connection(stream, service_fn(hello));
1205
1206 pin!(connection);
1207
1208 connection.as_mut().graceful_shutdown();
1209
1210 let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1211 .await
1212 .expect("Connection should have finished in a timely manner after graceful shutdown.")
1213 .expect_err("Connection should have been interrupted.");
1214
1215 let connection_error = connection_error
1216 .downcast_ref::<std::io::Error>()
1217 .expect("The error should have been `std::io::Error`.");
1218 assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1219 }
1220
1221 async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1222 where
1223 B: Body + Send + 'static,
1224 B::Data: Send,
1225 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1226 {
1227 let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1228 let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1229
1230 tokio::spawn(connection);
1231
1232 sender
1233 }
1234
1235 async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1236 where
1237 B: Body + Unpin + Send + 'static,
1238 B::Data: Send,
1239 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1240 {
1241 let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1242 let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1243 .handshake(stream)
1244 .await
1245 .unwrap();
1246
1247 tokio::spawn(connection);
1248
1249 sender
1250 }
1251
1252 async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1253 let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1254 let listener = TcpListener::bind(addr).await.unwrap();
1255
1256 let local_addr = listener.local_addr().unwrap();
1257
1258 tokio::spawn(async move {
1259 loop {
1260 let (stream, _) = listener.accept().await.unwrap();
1261 let stream = TokioIo::new(stream);
1262 tokio::task::spawn(async move {
1263 let mut builder = auto::Builder::new(TokioExecutor::new());
1264 if h1_only {
1265 builder = builder.http1_only();
1266 builder.serve_connection(stream, service_fn(hello)).await
1267 } else if h2_only {
1268 builder = builder.http2_only();
1269 builder.serve_connection(stream, service_fn(hello)).await
1270 } else {
1271 builder
1272 .http2()
1273 .max_header_list_size(4096)
1274 .serve_connection_with_upgrades(stream, service_fn(hello))
1275 .await
1276 }
1277 .unwrap();
1278 });
1279 }
1280 });
1281
1282 local_addr
1283 }
1284
1285 async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1286 Ok(Response::new(Full::new(Bytes::from(BODY))))
1287 }
1288}