rama_http_backend/server/layer/upgrade/
service.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//! upgrade service to handle branching into http upgrade services
//!
//! See [`UpgradeService`] for more details.

use super::Upgraded;
use rama_core::{context::Extensions, matcher::Matcher, service::BoxService, Context, Service};
use rama_http_types::Request;
use rama_utils::macros::define_inner_service_accessors;
use std::{convert::Infallible, fmt, sync::Arc};

/// Upgrade service can be used to handle the possibility of upgrading a request,
/// after which it will pass down the transport RW to the attached upgrade service.
pub struct UpgradeService<S, State, O> {
    handlers: Vec<Arc<UpgradeHandler<State, O>>>,
    inner: S,
}

/// UpgradeHandler is a helper struct used internally to create an upgrade service.
pub struct UpgradeHandler<S, O> {
    matcher: Box<dyn Matcher<S, Request>>,
    responder: BoxService<S, Request, (O, Context<S>, Request), O>,
    handler: Arc<BoxService<S, Upgraded, (), Infallible>>,
    _phantom: std::marker::PhantomData<fn(S, O) -> ()>,
}

impl<S, O> UpgradeHandler<S, O> {
    /// Create a new upgrade handler.
    pub(crate) fn new<M, R, H>(matcher: M, responder: R, handler: H) -> Self
    where
        M: Matcher<S, Request>,
        R: Service<S, Request, Response = (O, Context<S>, Request), Error = O> + Clone,
        H: Service<S, Upgraded, Response = (), Error = Infallible> + Clone,
    {
        Self {
            matcher: Box::new(matcher),
            responder: responder.boxed(),
            handler: Arc::new(handler.boxed()),
            _phantom: std::marker::PhantomData,
        }
    }
}

impl<S, State, O> UpgradeService<S, State, O> {
    /// Create a new [`UpgradeService`].
    pub const fn new(handlers: Vec<Arc<UpgradeHandler<State, O>>>, inner: S) -> Self {
        Self { handlers, inner }
    }

    define_inner_service_accessors!();
}

impl<S, State, O> fmt::Debug for UpgradeService<S, State, O>
where
    S: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("UpgradeService")
            .field("handlers", &self.handlers)
            .field("inner", &self.inner)
            .finish()
    }
}

impl<S, State, O> Clone for UpgradeService<S, State, O>
where
    S: Clone,
{
    fn clone(&self) -> Self {
        Self {
            handlers: self.handlers.clone(),
            inner: self.inner.clone(),
        }
    }
}

impl<S, State, O, E> Service<State, Request> for UpgradeService<S, State, O>
where
    State: Clone + Send + Sync + 'static,
    S: Service<State, Request, Response = O, Error = E>,
    O: Send + Sync + 'static,
    E: Send + Sync + 'static,
{
    type Response = O;
    type Error = E;

    async fn serve(
        &self,
        mut ctx: Context<State>,
        req: Request,
    ) -> Result<Self::Response, Self::Error> {
        let mut ext = Extensions::new();
        for handler in &self.handlers {
            if !handler.matcher.matches(Some(&mut ext), &ctx, &req) {
                ext.clear();
                continue;
            }
            ctx.extend(ext);
            let exec = ctx.executor().clone();
            return match handler.responder.serve(ctx, req).await {
                Ok((resp, ctx, mut req)) => {
                    let handler = handler.handler.clone();
                    exec.spawn_task(async move {
                        match rama_http_core::upgrade::on(&mut req).await {
                            Ok(upgraded) => {
                                let _ = handler.serve(ctx, upgraded).await;
                            }
                            Err(e) => {
                                // TODO: do we need to allow the user to hook into this?
                                tracing::error!(error = %e, "upgrade error");
                            }
                        }
                    });
                    Ok(resp)
                }
                Err(e) => Ok(e),
            };
        }
        self.inner.serve(ctx, req).await
    }
}

impl<S, O> fmt::Debug for UpgradeHandler<S, O> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("UpgradeHandler").finish()
    }
}