hyper_util/server/conn/auto/
mod.rs

1//! Http1 or Http2 connection.
2
3pub mod upgrade;
4
5use futures_util::ready;
6use hyper::service::HttpService;
7use std::future::Future;
8use std::marker::PhantomPinned;
9use std::mem::MaybeUninit;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::{error::Error as StdError, io, time::Duration};
13
14use bytes::Bytes;
15use http::{Request, Response};
16use http_body::Body;
17use hyper::{
18    body::Incoming,
19    rt::{Read, ReadBuf, Timer, Write},
20    service::Service,
21};
22
23#[cfg(feature = "http1")]
24use hyper::server::conn::http1;
25
26#[cfg(feature = "http2")]
27use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
28
29#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
30use std::marker::PhantomData;
31
32use pin_project_lite::pin_project;
33
34use crate::common::rewind::Rewind;
35
36type Error = Box<dyn std::error::Error + Send + Sync>;
37
38type Result<T> = std::result::Result<T, Error>;
39
40const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
41
42/// Exactly equivalent to [`Http2ServerConnExec`].
43#[cfg(feature = "http2")]
44pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
45
46#[cfg(feature = "http2")]
47impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
48
49/// Exactly equivalent to [`Http2ServerConnExec`].
50#[cfg(not(feature = "http2"))]
51pub trait HttpServerConnExec<A, B: Body> {}
52
53#[cfg(not(feature = "http2"))]
54impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
55
56/// Http1 or Http2 connection builder.
57#[derive(Clone, Debug)]
58pub struct Builder<E> {
59    #[cfg(feature = "http1")]
60    http1: http1::Builder,
61    #[cfg(feature = "http2")]
62    http2: http2::Builder<E>,
63    #[cfg(any(feature = "http1", feature = "http2"))]
64    version: Option<Version>,
65    #[cfg(not(feature = "http2"))]
66    _executor: E,
67}
68
69impl<E> Builder<E> {
70    /// Create a new auto connection builder.
71    ///
72    /// `executor` parameter should be a type that implements
73    /// [`Executor`](hyper::rt::Executor) trait.
74    ///
75    /// # Example
76    ///
77    /// ```
78    /// use hyper_util::{
79    ///     rt::TokioExecutor,
80    ///     server::conn::auto,
81    /// };
82    ///
83    /// auto::Builder::new(TokioExecutor::new());
84    /// ```
85    pub fn new(executor: E) -> Self {
86        Self {
87            #[cfg(feature = "http1")]
88            http1: http1::Builder::new(),
89            #[cfg(feature = "http2")]
90            http2: http2::Builder::new(executor),
91            #[cfg(any(feature = "http1", feature = "http2"))]
92            version: None,
93            #[cfg(not(feature = "http2"))]
94            _executor: executor,
95        }
96    }
97
98    /// Http1 configuration.
99    #[cfg(feature = "http1")]
100    pub fn http1(&mut self) -> Http1Builder<'_, E> {
101        Http1Builder { inner: self }
102    }
103
104    /// Http2 configuration.
105    #[cfg(feature = "http2")]
106    pub fn http2(&mut self) -> Http2Builder<'_, E> {
107        Http2Builder { inner: self }
108    }
109
110    /// Only accepts HTTP/2
111    ///
112    /// Does not do anything if used with [`serve_connection_with_upgrades`]
113    ///
114    /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
115    #[cfg(feature = "http2")]
116    pub fn http2_only(mut self) -> Self {
117        assert!(self.version.is_none());
118        self.version = Some(Version::H2);
119        self
120    }
121
122    /// Only accepts HTTP/1
123    ///
124    /// Does not do anything if used with [`serve_connection_with_upgrades`]
125    ///
126    /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
127    #[cfg(feature = "http1")]
128    pub fn http1_only(mut self) -> Self {
129        assert!(self.version.is_none());
130        self.version = Some(Version::H1);
131        self
132    }
133
134    /// Returns `true` if this builder can serve an HTTP/1.1-based connection.
135    pub fn is_http1_available(&self) -> bool {
136        match self.version {
137            #[cfg(feature = "http1")]
138            Some(Version::H1) => true,
139            #[cfg(feature = "http2")]
140            Some(Version::H2) => false,
141            #[cfg(any(feature = "http1", feature = "http2"))]
142            _ => true,
143        }
144    }
145
146    /// Returns `true` if this builder can serve an HTTP/2-based connection.
147    pub fn is_http2_available(&self) -> bool {
148        match self.version {
149            #[cfg(feature = "http1")]
150            Some(Version::H1) => false,
151            #[cfg(feature = "http2")]
152            Some(Version::H2) => true,
153            #[cfg(any(feature = "http1", feature = "http2"))]
154            _ => true,
155        }
156    }
157
158    /// Bind a connection together with a [`Service`].
159    pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
160    where
161        S: Service<Request<Incoming>, Response = Response<B>>,
162        S::Future: 'static,
163        S::Error: Into<Box<dyn StdError + Send + Sync>>,
164        B: Body + 'static,
165        B::Error: Into<Box<dyn StdError + Send + Sync>>,
166        I: Read + Write + Unpin + 'static,
167        E: HttpServerConnExec<S::Future, B>,
168    {
169        let state = match self.version {
170            #[cfg(feature = "http1")]
171            Some(Version::H1) => {
172                let io = Rewind::new_buffered(io, Bytes::new());
173                let conn = self.http1.serve_connection(io, service);
174                ConnState::H1 { conn }
175            }
176            #[cfg(feature = "http2")]
177            Some(Version::H2) => {
178                let io = Rewind::new_buffered(io, Bytes::new());
179                let conn = self.http2.serve_connection(io, service);
180                ConnState::H2 { conn }
181            }
182            #[cfg(any(feature = "http1", feature = "http2"))]
183            _ => ConnState::ReadVersion {
184                read_version: read_version(io),
185                builder: Cow::Borrowed(self),
186                service: Some(service),
187            },
188        };
189
190        Connection { state }
191    }
192
193    /// Bind a connection together with a [`Service`], with the ability to
194    /// handle HTTP upgrades. This requires that the IO object implements
195    /// `Send`.
196    ///
197    /// Note that if you ever want to use [`hyper::upgrade::Upgraded::downcast`]
198    /// with this crate, you'll need to use [`hyper_util::server::conn::auto::upgrade::downcast`]
199    /// instead. See the documentation of the latter to understand why.
200    ///
201    /// [`hyper_util::server::conn::auto::upgrade::downcast`]: crate::server::conn::auto::upgrade::downcast
202    pub fn serve_connection_with_upgrades<I, S, B>(
203        &self,
204        io: I,
205        service: S,
206    ) -> UpgradeableConnection<'_, I, S, E>
207    where
208        S: Service<Request<Incoming>, Response = Response<B>>,
209        S::Future: 'static,
210        S::Error: Into<Box<dyn StdError + Send + Sync>>,
211        B: Body + 'static,
212        B::Error: Into<Box<dyn StdError + Send + Sync>>,
213        I: Read + Write + Unpin + Send + 'static,
214        E: HttpServerConnExec<S::Future, B>,
215    {
216        UpgradeableConnection {
217            state: UpgradeableConnState::ReadVersion {
218                read_version: read_version(io),
219                builder: Cow::Borrowed(self),
220                service: Some(service),
221            },
222        }
223    }
224}
225
226#[derive(Copy, Clone, Debug)]
227enum Version {
228    H1,
229    H2,
230}
231
232impl Version {
233    #[must_use]
234    #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
235    pub fn unsupported(self) -> Error {
236        match self {
237            Version::H1 => Error::from("HTTP/1 is not supported"),
238            Version::H2 => Error::from("HTTP/2 is not supported"),
239        }
240    }
241}
242
243fn read_version<I>(io: I) -> ReadVersion<I>
244where
245    I: Read + Unpin,
246{
247    ReadVersion {
248        io: Some(io),
249        buf: [MaybeUninit::uninit(); 24],
250        filled: 0,
251        version: Version::H2,
252        cancelled: false,
253        _pin: PhantomPinned,
254    }
255}
256
257pin_project! {
258    struct ReadVersion<I> {
259        io: Option<I>,
260        buf: [MaybeUninit<u8>; 24],
261        // the amount of `buf` thats been filled
262        filled: usize,
263        version: Version,
264        cancelled: bool,
265        // Make this future `!Unpin` for compatibility with async trait methods.
266        #[pin]
267        _pin: PhantomPinned,
268    }
269}
270
271impl<I> ReadVersion<I> {
272    pub fn cancel(self: Pin<&mut Self>) {
273        *self.project().cancelled = true;
274    }
275}
276
277impl<I> Future for ReadVersion<I>
278where
279    I: Read + Unpin,
280{
281    type Output = io::Result<(Version, Rewind<I>)>;
282
283    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
284        let this = self.project();
285        if *this.cancelled {
286            return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
287        }
288
289        let mut buf = ReadBuf::uninit(&mut *this.buf);
290        // SAFETY: `this.filled` tracks how many bytes have been read (and thus initialized) and
291        // we're only advancing by that many.
292        unsafe {
293            buf.unfilled().advance(*this.filled);
294        };
295
296        // We start as H2 and switch to H1 as soon as we don't have the preface.
297        while buf.filled().len() < H2_PREFACE.len() {
298            let len = buf.filled().len();
299            ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
300            *this.filled = buf.filled().len();
301
302            // We starts as H2 and switch to H1 when we don't get the preface.
303            if buf.filled().len() == len
304                || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
305            {
306                *this.version = Version::H1;
307                break;
308            }
309        }
310
311        let io = this.io.take().unwrap();
312        let buf = buf.filled().to_vec();
313        Poll::Ready(Ok((
314            *this.version,
315            Rewind::new_buffered(io, Bytes::from(buf)),
316        )))
317    }
318}
319
320pin_project! {
321    /// Connection future.
322    pub struct Connection<'a, I, S, E>
323    where
324        S: HttpService<Incoming>,
325    {
326        #[pin]
327        state: ConnState<'a, I, S, E>,
328    }
329}
330
331// A custom COW, since the libstd is has ToOwned bounds that are too eager.
332enum Cow<'a, T> {
333    Borrowed(&'a T),
334    Owned(T),
335}
336
337impl<T> std::ops::Deref for Cow<'_, T> {
338    type Target = T;
339    fn deref(&self) -> &T {
340        match self {
341            Cow::Borrowed(t) => &*t,
342            Cow::Owned(ref t) => t,
343        }
344    }
345}
346
347#[cfg(feature = "http1")]
348type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
349
350#[cfg(not(feature = "http1"))]
351type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
352
353#[cfg(feature = "http2")]
354type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
355
356#[cfg(not(feature = "http2"))]
357type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
358
359pin_project! {
360    #[project = ConnStateProj]
361    enum ConnState<'a, I, S, E>
362    where
363        S: HttpService<Incoming>,
364    {
365        ReadVersion {
366            #[pin]
367            read_version: ReadVersion<I>,
368            builder: Cow<'a, Builder<E>>,
369            service: Option<S>,
370        },
371        H1 {
372            #[pin]
373            conn: Http1Connection<I, S>,
374        },
375        H2 {
376            #[pin]
377            conn: Http2Connection<I, S, E>,
378        },
379    }
380}
381
382impl<I, S, E, B> Connection<'_, I, S, E>
383where
384    S: HttpService<Incoming, ResBody = B>,
385    S::Error: Into<Box<dyn StdError + Send + Sync>>,
386    I: Read + Write + Unpin,
387    B: Body + 'static,
388    B::Error: Into<Box<dyn StdError + Send + Sync>>,
389    E: HttpServerConnExec<S::Future, B>,
390{
391    /// Start a graceful shutdown process for this connection.
392    ///
393    /// This `Connection` should continue to be polled until shutdown can finish.
394    ///
395    /// # Note
396    ///
397    /// This should only be called while the `Connection` future is still pending. If called after
398    /// `Connection::poll` has resolved, this does nothing.
399    pub fn graceful_shutdown(self: Pin<&mut Self>) {
400        match self.project().state.project() {
401            ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
402            #[cfg(feature = "http1")]
403            ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
404            #[cfg(feature = "http2")]
405            ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
406            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
407            _ => unreachable!(),
408        }
409    }
410
411    /// Make this Connection static, instead of borrowing from Builder.
412    pub fn into_owned(self) -> Connection<'static, I, S, E>
413    where
414        Builder<E>: Clone,
415    {
416        Connection {
417            state: match self.state {
418                ConnState::ReadVersion {
419                    read_version,
420                    builder,
421                    service,
422                } => ConnState::ReadVersion {
423                    read_version,
424                    service,
425                    builder: Cow::Owned(builder.clone()),
426                },
427                #[cfg(feature = "http1")]
428                ConnState::H1 { conn } => ConnState::H1 { conn },
429                #[cfg(feature = "http2")]
430                ConnState::H2 { conn } => ConnState::H2 { conn },
431                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
432                _ => unreachable!(),
433            },
434        }
435    }
436}
437
438impl<I, S, E, B> Future for Connection<'_, I, S, E>
439where
440    S: Service<Request<Incoming>, Response = Response<B>>,
441    S::Future: 'static,
442    S::Error: Into<Box<dyn StdError + Send + Sync>>,
443    B: Body + 'static,
444    B::Error: Into<Box<dyn StdError + Send + Sync>>,
445    I: Read + Write + Unpin + 'static,
446    E: HttpServerConnExec<S::Future, B>,
447{
448    type Output = Result<()>;
449
450    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451        loop {
452            let mut this = self.as_mut().project();
453
454            match this.state.as_mut().project() {
455                ConnStateProj::ReadVersion {
456                    read_version,
457                    builder,
458                    service,
459                } => {
460                    let (version, io) = ready!(read_version.poll(cx))?;
461                    let service = service.take().unwrap();
462                    match version {
463                        #[cfg(feature = "http1")]
464                        Version::H1 => {
465                            let conn = builder.http1.serve_connection(io, service);
466                            this.state.set(ConnState::H1 { conn });
467                        }
468                        #[cfg(feature = "http2")]
469                        Version::H2 => {
470                            let conn = builder.http2.serve_connection(io, service);
471                            this.state.set(ConnState::H2 { conn });
472                        }
473                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
474                        _ => return Poll::Ready(Err(version.unsupported())),
475                    }
476                }
477                #[cfg(feature = "http1")]
478                ConnStateProj::H1 { conn } => {
479                    return conn.poll(cx).map_err(Into::into);
480                }
481                #[cfg(feature = "http2")]
482                ConnStateProj::H2 { conn } => {
483                    return conn.poll(cx).map_err(Into::into);
484                }
485                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
486                _ => unreachable!(),
487            }
488        }
489    }
490}
491
492pin_project! {
493    /// Connection future.
494    pub struct UpgradeableConnection<'a, I, S, E>
495    where
496        S: HttpService<Incoming>,
497    {
498        #[pin]
499        state: UpgradeableConnState<'a, I, S, E>,
500    }
501}
502
503#[cfg(feature = "http1")]
504type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
505
506#[cfg(not(feature = "http1"))]
507type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
508
509pin_project! {
510    #[project = UpgradeableConnStateProj]
511    enum UpgradeableConnState<'a, I, S, E>
512    where
513        S: HttpService<Incoming>,
514    {
515        ReadVersion {
516            #[pin]
517            read_version: ReadVersion<I>,
518            builder: Cow<'a, Builder<E>>,
519            service: Option<S>,
520        },
521        H1 {
522            #[pin]
523            conn: Http1UpgradeableConnection<Rewind<I>, S>,
524        },
525        H2 {
526            #[pin]
527            conn: Http2Connection<I, S, E>,
528        },
529    }
530}
531
532impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
533where
534    S: HttpService<Incoming, ResBody = B>,
535    S::Error: Into<Box<dyn StdError + Send + Sync>>,
536    I: Read + Write + Unpin,
537    B: Body + 'static,
538    B::Error: Into<Box<dyn StdError + Send + Sync>>,
539    E: HttpServerConnExec<S::Future, B>,
540{
541    /// Start a graceful shutdown process for this connection.
542    ///
543    /// This `UpgradeableConnection` should continue to be polled until shutdown can finish.
544    ///
545    /// # Note
546    ///
547    /// This should only be called while the `Connection` future is still nothing. pending. If
548    /// called after `UpgradeableConnection::poll` has resolved, this does nothing.
549    pub fn graceful_shutdown(self: Pin<&mut Self>) {
550        match self.project().state.project() {
551            UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
552            #[cfg(feature = "http1")]
553            UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
554            #[cfg(feature = "http2")]
555            UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
556            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
557            _ => unreachable!(),
558        }
559    }
560
561    /// Make this Connection static, instead of borrowing from Builder.
562    pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
563    where
564        Builder<E>: Clone,
565    {
566        UpgradeableConnection {
567            state: match self.state {
568                UpgradeableConnState::ReadVersion {
569                    read_version,
570                    builder,
571                    service,
572                } => UpgradeableConnState::ReadVersion {
573                    read_version,
574                    service,
575                    builder: Cow::Owned(builder.clone()),
576                },
577                #[cfg(feature = "http1")]
578                UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
579                #[cfg(feature = "http2")]
580                UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
581                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
582                _ => unreachable!(),
583            },
584        }
585    }
586}
587
588impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
589where
590    S: Service<Request<Incoming>, Response = Response<B>>,
591    S::Future: 'static,
592    S::Error: Into<Box<dyn StdError + Send + Sync>>,
593    B: Body + 'static,
594    B::Error: Into<Box<dyn StdError + Send + Sync>>,
595    I: Read + Write + Unpin + Send + 'static,
596    E: HttpServerConnExec<S::Future, B>,
597{
598    type Output = Result<()>;
599
600    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
601        loop {
602            let mut this = self.as_mut().project();
603
604            match this.state.as_mut().project() {
605                UpgradeableConnStateProj::ReadVersion {
606                    read_version,
607                    builder,
608                    service,
609                } => {
610                    let (version, io) = ready!(read_version.poll(cx))?;
611                    let service = service.take().unwrap();
612                    match version {
613                        #[cfg(feature = "http1")]
614                        Version::H1 => {
615                            let conn = builder.http1.serve_connection(io, service).with_upgrades();
616                            this.state.set(UpgradeableConnState::H1 { conn });
617                        }
618                        #[cfg(feature = "http2")]
619                        Version::H2 => {
620                            let conn = builder.http2.serve_connection(io, service);
621                            this.state.set(UpgradeableConnState::H2 { conn });
622                        }
623                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
624                        _ => return Poll::Ready(Err(version.unsupported())),
625                    }
626                }
627                #[cfg(feature = "http1")]
628                UpgradeableConnStateProj::H1 { conn } => {
629                    return conn.poll(cx).map_err(Into::into);
630                }
631                #[cfg(feature = "http2")]
632                UpgradeableConnStateProj::H2 { conn } => {
633                    return conn.poll(cx).map_err(Into::into);
634                }
635                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
636                _ => unreachable!(),
637            }
638        }
639    }
640}
641
642/// Http1 part of builder.
643#[cfg(feature = "http1")]
644pub struct Http1Builder<'a, E> {
645    inner: &'a mut Builder<E>,
646}
647
648#[cfg(feature = "http1")]
649impl<E> Http1Builder<'_, E> {
650    /// Http2 configuration.
651    #[cfg(feature = "http2")]
652    pub fn http2(&mut self) -> Http2Builder<'_, E> {
653        Http2Builder { inner: self.inner }
654    }
655
656    /// Set whether the `date` header should be included in HTTP responses.
657    ///
658    /// Note that including the `date` header is recommended by RFC 7231.
659    ///
660    /// Default is true.
661    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
662        self.inner.http1.auto_date_header(enabled);
663        self
664    }
665
666    /// Set whether HTTP/1 connections should support half-closures.
667    ///
668    /// Clients can chose to shutdown their write-side while waiting
669    /// for the server to respond. Setting this to `true` will
670    /// prevent closing the connection immediately if `read`
671    /// detects an EOF in the middle of a request.
672    ///
673    /// Default is `false`.
674    pub fn half_close(&mut self, val: bool) -> &mut Self {
675        self.inner.http1.half_close(val);
676        self
677    }
678
679    /// Enables or disables HTTP/1 keep-alive.
680    ///
681    /// Default is true.
682    pub fn keep_alive(&mut self, val: bool) -> &mut Self {
683        self.inner.http1.keep_alive(val);
684        self
685    }
686
687    /// Set whether HTTP/1 connections will write header names as title case at
688    /// the socket level.
689    ///
690    /// Note that this setting does not affect HTTP/2.
691    ///
692    /// Default is false.
693    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
694        self.inner.http1.title_case_headers(enabled);
695        self
696    }
697
698    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
699    ///
700    /// If this is enabled and a header line does not start with a valid header
701    /// name, or does not include a colon at all, the line will be silently ignored
702    /// and no error will be reported.
703    ///
704    /// Default is false.
705    pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
706        self.inner.http1.ignore_invalid_headers(enabled);
707        self
708    }
709
710    /// Set whether to support preserving original header cases.
711    ///
712    /// Currently, this will record the original cases received, and store them
713    /// in a private extension on the `Request`. It will also look for and use
714    /// such an extension in any provided `Response`.
715    ///
716    /// Since the relevant extension is still private, there is no way to
717    /// interact with the original cases. The only effect this can have now is
718    /// to forward the cases in a proxy-like fashion.
719    ///
720    /// Note that this setting does not affect HTTP/2.
721    ///
722    /// Default is false.
723    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
724        self.inner.http1.preserve_header_case(enabled);
725        self
726    }
727
728    /// Set the maximum number of headers.
729    ///
730    /// When a request is received, the parser will reserve a buffer to store headers for optimal
731    /// performance.
732    ///
733    /// If server receives more headers than the buffer size, it responds to the client with
734    /// "431 Request Header Fields Too Large".
735    ///
736    /// The headers is allocated on the stack by default, which has higher performance. After
737    /// setting this value, headers will be allocated in heap memory, that is, heap memory
738    /// allocation will occur for each request, and there will be a performance drop of about 5%.
739    ///
740    /// Note that this setting does not affect HTTP/2.
741    ///
742    /// Default is 100.
743    pub fn max_headers(&mut self, val: usize) -> &mut Self {
744        self.inner.http1.max_headers(val);
745        self
746    }
747
748    /// Set a timeout for reading client request headers. If a client does not
749    /// transmit the entire header within this time, the connection is closed.
750    ///
751    /// Requires a [`Timer`] set by [`Http1Builder::timer`] to take effect. Panics if `header_read_timeout` is configured
752    /// without a [`Timer`].
753    ///
754    /// Pass `None` to disable.
755    ///
756    /// Default is currently 30 seconds, but do not depend on that.
757    pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
758        self.inner.http1.header_read_timeout(read_timeout);
759        self
760    }
761
762    /// Set whether HTTP/1 connections should try to use vectored writes,
763    /// or always flatten into a single buffer.
764    ///
765    /// Note that setting this to false may mean more copies of body data,
766    /// but may also improve performance when an IO transport doesn't
767    /// support vectored writes well, such as most TLS implementations.
768    ///
769    /// Setting this to true will force hyper to use queued strategy
770    /// which may eliminate unnecessary cloning on some TLS backends
771    ///
772    /// Default is `auto`. In this mode hyper will try to guess which
773    /// mode to use
774    pub fn writev(&mut self, val: bool) -> &mut Self {
775        self.inner.http1.writev(val);
776        self
777    }
778
779    /// Set the maximum buffer size for the connection.
780    ///
781    /// Default is ~400kb.
782    ///
783    /// # Panics
784    ///
785    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
786    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
787        self.inner.http1.max_buf_size(max);
788        self
789    }
790
791    /// Aggregates flushes to better support pipelined responses.
792    ///
793    /// Experimental, may have bugs.
794    ///
795    /// Default is false.
796    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
797        self.inner.http1.pipeline_flush(enabled);
798        self
799    }
800
801    /// Set the timer used in background tasks.
802    pub fn timer<M>(&mut self, timer: M) -> &mut Self
803    where
804        M: Timer + Send + Sync + 'static,
805    {
806        self.inner.http1.timer(timer);
807        self
808    }
809
810    /// Bind a connection together with a [`Service`].
811    #[cfg(feature = "http2")]
812    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
813    where
814        S: Service<Request<Incoming>, Response = Response<B>>,
815        S::Future: 'static,
816        S::Error: Into<Box<dyn StdError + Send + Sync>>,
817        B: Body + 'static,
818        B::Error: Into<Box<dyn StdError + Send + Sync>>,
819        I: Read + Write + Unpin + 'static,
820        E: HttpServerConnExec<S::Future, B>,
821    {
822        self.inner.serve_connection(io, service).await
823    }
824
825    /// Bind a connection together with a [`Service`].
826    #[cfg(not(feature = "http2"))]
827    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
828    where
829        S: Service<Request<Incoming>, Response = Response<B>>,
830        S::Future: 'static,
831        S::Error: Into<Box<dyn StdError + Send + Sync>>,
832        B: Body + 'static,
833        B::Error: Into<Box<dyn StdError + Send + Sync>>,
834        I: Read + Write + Unpin + 'static,
835    {
836        self.inner.serve_connection(io, service).await
837    }
838
839    /// Bind a connection together with a [`Service`], with the ability to
840    /// handle HTTP upgrades. This requires that the IO object implements
841    /// `Send`.
842    #[cfg(feature = "http2")]
843    pub fn serve_connection_with_upgrades<I, S, B>(
844        &self,
845        io: I,
846        service: S,
847    ) -> UpgradeableConnection<'_, I, S, E>
848    where
849        S: Service<Request<Incoming>, Response = Response<B>>,
850        S::Future: 'static,
851        S::Error: Into<Box<dyn StdError + Send + Sync>>,
852        B: Body + 'static,
853        B::Error: Into<Box<dyn StdError + Send + Sync>>,
854        I: Read + Write + Unpin + Send + 'static,
855        E: HttpServerConnExec<S::Future, B>,
856    {
857        self.inner.serve_connection_with_upgrades(io, service)
858    }
859}
860
861/// Http2 part of builder.
862#[cfg(feature = "http2")]
863pub struct Http2Builder<'a, E> {
864    inner: &'a mut Builder<E>,
865}
866
867#[cfg(feature = "http2")]
868impl<E> Http2Builder<'_, E> {
869    #[cfg(feature = "http1")]
870    /// Http1 configuration.
871    pub fn http1(&mut self) -> Http1Builder<'_, E> {
872        Http1Builder { inner: self.inner }
873    }
874
875    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
876    ///
877    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
878    /// As of v0.4.0, it is 20.
879    ///
880    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
881    pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
882        self.inner.http2.max_pending_accept_reset_streams(max);
883        self
884    }
885
886    /// Configures the maximum number of local reset streams allowed before a GOAWAY will be sent.
887    ///
888    /// If not set, hyper will use a default, currently of 1024.
889    ///
890    /// If `None` is supplied, hyper will not apply any limit.
891    /// This is not advised, as it can potentially expose servers to DOS vulnerabilities.
892    ///
893    /// See <https://rustsec.org/advisories/RUSTSEC-2024-0003.html> for more information.
894    pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
895        self.inner.http2.max_local_error_reset_streams(max);
896        self
897    }
898
899    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
900    /// stream-level flow control.
901    ///
902    /// Passing `None` will do nothing.
903    ///
904    /// If not set, hyper will use a default.
905    ///
906    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
907    pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
908        self.inner.http2.initial_stream_window_size(sz);
909        self
910    }
911
912    /// Sets the max connection-level flow control for HTTP2.
913    ///
914    /// Passing `None` will do nothing.
915    ///
916    /// If not set, hyper will use a default.
917    pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
918        self.inner.http2.initial_connection_window_size(sz);
919        self
920    }
921
922    /// Sets whether to use an adaptive flow control.
923    ///
924    /// Enabling this will override the limits set in
925    /// `http2_initial_stream_window_size` and
926    /// `http2_initial_connection_window_size`.
927    pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
928        self.inner.http2.adaptive_window(enabled);
929        self
930    }
931
932    /// Sets the maximum frame size to use for HTTP2.
933    ///
934    /// Passing `None` will do nothing.
935    ///
936    /// If not set, hyper will use a default.
937    pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
938        self.inner.http2.max_frame_size(sz);
939        self
940    }
941
942    /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
943    /// connections.
944    ///
945    /// Default is 200. Passing `None` will remove any limit.
946    ///
947    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
948    pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
949        self.inner.http2.max_concurrent_streams(max);
950        self
951    }
952
953    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
954    /// connection alive.
955    ///
956    /// Pass `None` to disable HTTP2 keep-alive.
957    ///
958    /// Default is currently disabled.
959    ///
960    /// # Cargo Feature
961    ///
962    pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
963        self.inner.http2.keep_alive_interval(interval);
964        self
965    }
966
967    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
968    ///
969    /// If the ping is not acknowledged within the timeout, the connection will
970    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
971    ///
972    /// Default is 20 seconds.
973    ///
974    /// # Cargo Feature
975    ///
976    pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
977        self.inner.http2.keep_alive_timeout(timeout);
978        self
979    }
980
981    /// Set the maximum write buffer size for each HTTP/2 stream.
982    ///
983    /// Default is currently ~400KB, but may change.
984    ///
985    /// # Panics
986    ///
987    /// The value must be no larger than `u32::MAX`.
988    pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
989        self.inner.http2.max_send_buf_size(max);
990        self
991    }
992
993    /// Enables the [extended CONNECT protocol].
994    ///
995    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
996    pub fn enable_connect_protocol(&mut self) -> &mut Self {
997        self.inner.http2.enable_connect_protocol();
998        self
999    }
1000
1001    /// Sets the max size of received header frames.
1002    ///
1003    /// Default is currently ~16MB, but may change.
1004    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
1005        self.inner.http2.max_header_list_size(max);
1006        self
1007    }
1008
1009    /// Set the timer used in background tasks.
1010    pub fn timer<M>(&mut self, timer: M) -> &mut Self
1011    where
1012        M: Timer + Send + Sync + 'static,
1013    {
1014        self.inner.http2.timer(timer);
1015        self
1016    }
1017
1018    /// Set whether the `date` header should be included in HTTP responses.
1019    ///
1020    /// Note that including the `date` header is recommended by RFC 7231.
1021    ///
1022    /// Default is true.
1023    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
1024        self.inner.http2.auto_date_header(enabled);
1025        self
1026    }
1027
1028    /// Bind a connection together with a [`Service`].
1029    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
1030    where
1031        S: Service<Request<Incoming>, Response = Response<B>>,
1032        S::Future: 'static,
1033        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1034        B: Body + 'static,
1035        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1036        I: Read + Write + Unpin + 'static,
1037        E: HttpServerConnExec<S::Future, B>,
1038    {
1039        self.inner.serve_connection(io, service).await
1040    }
1041
1042    /// Bind a connection together with a [`Service`], with the ability to
1043    /// handle HTTP upgrades. This requires that the IO object implements
1044    /// `Send`.
1045    pub fn serve_connection_with_upgrades<I, S, B>(
1046        &self,
1047        io: I,
1048        service: S,
1049    ) -> UpgradeableConnection<'_, I, S, E>
1050    where
1051        S: Service<Request<Incoming>, Response = Response<B>>,
1052        S::Future: 'static,
1053        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1054        B: Body + 'static,
1055        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1056        I: Read + Write + Unpin + Send + 'static,
1057        E: HttpServerConnExec<S::Future, B>,
1058    {
1059        self.inner.serve_connection_with_upgrades(io, service)
1060    }
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065    use crate::{
1066        rt::{TokioExecutor, TokioIo},
1067        server::conn::auto,
1068    };
1069    use http::{Request, Response};
1070    use http_body::Body;
1071    use http_body_util::{BodyExt, Empty, Full};
1072    use hyper::{body, body::Bytes, client, service::service_fn};
1073    use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
1074    use tokio::{
1075        net::{TcpListener, TcpStream},
1076        pin,
1077    };
1078
1079    const BODY: &[u8] = b"Hello, world!";
1080
1081    #[test]
1082    fn configuration() {
1083        // One liner.
1084        auto::Builder::new(TokioExecutor::new())
1085            .http1()
1086            .keep_alive(true)
1087            .http2()
1088            .keep_alive_interval(None);
1089        //  .serve_connection(io, service);
1090
1091        // Using variable.
1092        let mut builder = auto::Builder::new(TokioExecutor::new());
1093
1094        builder.http1().keep_alive(true);
1095        builder.http2().keep_alive_interval(None);
1096        // builder.serve_connection(io, service);
1097    }
1098
1099    #[cfg(not(miri))]
1100    #[tokio::test]
1101    async fn http1() {
1102        let addr = start_server(false, false).await;
1103        let mut sender = connect_h1(addr).await;
1104
1105        let response = sender
1106            .send_request(Request::new(Empty::<Bytes>::new()))
1107            .await
1108            .unwrap();
1109
1110        let body = response.into_body().collect().await.unwrap().to_bytes();
1111
1112        assert_eq!(body, BODY);
1113    }
1114
1115    #[cfg(not(miri))]
1116    #[tokio::test]
1117    async fn http2() {
1118        let addr = start_server(false, false).await;
1119        let mut sender = connect_h2(addr).await;
1120
1121        let response = sender
1122            .send_request(Request::new(Empty::<Bytes>::new()))
1123            .await
1124            .unwrap();
1125
1126        let body = response.into_body().collect().await.unwrap().to_bytes();
1127
1128        assert_eq!(body, BODY);
1129    }
1130
1131    #[cfg(not(miri))]
1132    #[tokio::test]
1133    async fn http2_only() {
1134        let addr = start_server(false, true).await;
1135        let mut sender = connect_h2(addr).await;
1136
1137        let response = sender
1138            .send_request(Request::new(Empty::<Bytes>::new()))
1139            .await
1140            .unwrap();
1141
1142        let body = response.into_body().collect().await.unwrap().to_bytes();
1143
1144        assert_eq!(body, BODY);
1145    }
1146
1147    #[cfg(not(miri))]
1148    #[tokio::test]
1149    async fn http2_only_fail_if_client_is_http1() {
1150        let addr = start_server(false, true).await;
1151        let mut sender = connect_h1(addr).await;
1152
1153        let _ = sender
1154            .send_request(Request::new(Empty::<Bytes>::new()))
1155            .await
1156            .expect_err("should fail");
1157    }
1158
1159    #[cfg(not(miri))]
1160    #[tokio::test]
1161    async fn http1_only() {
1162        let addr = start_server(true, false).await;
1163        let mut sender = connect_h1(addr).await;
1164
1165        let response = sender
1166            .send_request(Request::new(Empty::<Bytes>::new()))
1167            .await
1168            .unwrap();
1169
1170        let body = response.into_body().collect().await.unwrap().to_bytes();
1171
1172        assert_eq!(body, BODY);
1173    }
1174
1175    #[cfg(not(miri))]
1176    #[tokio::test]
1177    async fn http1_only_fail_if_client_is_http2() {
1178        let addr = start_server(true, false).await;
1179        let mut sender = connect_h2(addr).await;
1180
1181        let _ = sender
1182            .send_request(Request::new(Empty::<Bytes>::new()))
1183            .await
1184            .expect_err("should fail");
1185    }
1186
1187    #[cfg(not(miri))]
1188    #[tokio::test]
1189    async fn graceful_shutdown() {
1190        let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1191            .await
1192            .unwrap();
1193
1194        let listener_addr = listener.local_addr().unwrap();
1195
1196        // Spawn the task in background so that we can connect there
1197        let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1198        // Only connect a stream, do not send headers or anything
1199        let _stream = TcpStream::connect(listener_addr).await.unwrap();
1200
1201        let (stream, _) = listen_task.await.unwrap();
1202        let stream = TokioIo::new(stream);
1203        let builder = auto::Builder::new(TokioExecutor::new());
1204        let connection = builder.serve_connection(stream, service_fn(hello));
1205
1206        pin!(connection);
1207
1208        connection.as_mut().graceful_shutdown();
1209
1210        let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1211            .await
1212            .expect("Connection should have finished in a timely manner after graceful shutdown.")
1213            .expect_err("Connection should have been interrupted.");
1214
1215        let connection_error = connection_error
1216            .downcast_ref::<std::io::Error>()
1217            .expect("The error should have been `std::io::Error`.");
1218        assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1219    }
1220
1221    async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1222    where
1223        B: Body + Send + 'static,
1224        B::Data: Send,
1225        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1226    {
1227        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1228        let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1229
1230        tokio::spawn(connection);
1231
1232        sender
1233    }
1234
1235    async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1236    where
1237        B: Body + Unpin + Send + 'static,
1238        B::Data: Send,
1239        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1240    {
1241        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1242        let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1243            .handshake(stream)
1244            .await
1245            .unwrap();
1246
1247        tokio::spawn(connection);
1248
1249        sender
1250    }
1251
1252    async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1253        let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1254        let listener = TcpListener::bind(addr).await.unwrap();
1255
1256        let local_addr = listener.local_addr().unwrap();
1257
1258        tokio::spawn(async move {
1259            loop {
1260                let (stream, _) = listener.accept().await.unwrap();
1261                let stream = TokioIo::new(stream);
1262                tokio::task::spawn(async move {
1263                    let mut builder = auto::Builder::new(TokioExecutor::new());
1264                    if h1_only {
1265                        builder = builder.http1_only();
1266                        builder.serve_connection(stream, service_fn(hello)).await
1267                    } else if h2_only {
1268                        builder = builder.http2_only();
1269                        builder.serve_connection(stream, service_fn(hello)).await
1270                    } else {
1271                        builder
1272                            .http2()
1273                            .max_header_list_size(4096)
1274                            .serve_connection_with_upgrades(stream, service_fn(hello))
1275                            .await
1276                    }
1277                    .unwrap();
1278                });
1279            }
1280        });
1281
1282        local_addr
1283    }
1284
1285    async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1286        Ok(Response::new(Full::new(Bytes::from(BODY))))
1287    }
1288}