zino_http/request/
mod.rs

1//! Request context and validation.
2
3use crate::{
4    helper,
5    response::{Rejection, Response, ResponseCode},
6};
7use multer::Multipart;
8use serde::de::DeserializeOwned;
9use std::{borrow::Cow, net::IpAddr, str::FromStr, time::Instant};
10use zino_channel::{CloudEvent, Subscription};
11use zino_core::{
12    JsonValue, Map, SharedString, Uuid,
13    application::Agent,
14    error::Error,
15    extension::HeaderMapExt,
16    model::{ModelHooks, Query},
17    trace::{TraceContext, TraceState},
18    warn,
19};
20use zino_storage::NamedFile;
21
22#[cfg(feature = "auth")]
23use zino_auth::{AccessKeyId, Authentication, ParseSecurityTokenError, SecurityToken, SessionId};
24
25#[cfg(feature = "auth")]
26use zino_core::{datetime::DateTime, extension::JsonObjectExt, validation::Validation};
27
28#[cfg(feature = "cookie")]
29use cookie::{Cookie, SameSite};
30
31#[cfg(feature = "jwt")]
32use jwt_simple::algorithms::MACLike;
33#[cfg(feature = "jwt")]
34use zino_auth::JwtClaims;
35
36#[cfg(any(feature = "cookie", feature = "jwt"))]
37use std::time::Duration;
38
39#[cfg(feature = "i18n")]
40use crate::i18n;
41#[cfg(feature = "i18n")]
42use fluent::FluentArgs;
43#[cfg(feature = "i18n")]
44use unic_langid::LanguageIdentifier;
45
46mod context;
47
48pub use context::Context;
49
50/// Request context.
51pub trait RequestContext {
52    /// The method type.
53    type Method: AsRef<str>;
54    /// The uri type.
55    type Uri;
56
57    /// Returns the request method.
58    fn request_method(&self) -> &Self::Method;
59
60    /// Returns the original request URI regardless of nesting.
61    fn original_uri(&self) -> &Self::Uri;
62
63    /// Returns the route that matches the request.
64    fn matched_route(&self) -> Cow<'_, str>;
65
66    /// Returns the request path regardless of nesting.
67    fn request_path(&self) -> &str;
68
69    /// Gets the query string of the request.
70    fn get_query_string(&self) -> Option<&str>;
71
72    /// Gets an HTTP header value with the given name.
73    fn get_header(&self, name: &str) -> Option<&str>;
74
75    /// Returns the client's remote IP.
76    fn client_ip(&self) -> Option<IpAddr>;
77
78    /// Gets the request context.
79    fn get_context(&self) -> Option<Context>;
80
81    /// Gets the request scoped data.
82    fn get_data<T: Clone + Send + Sync + 'static>(&self) -> Option<T>;
83
84    /// Sets the request scoped data and returns the old value
85    /// if an item of this type was already stored.
86    fn set_data<T: Clone + Send + Sync + 'static>(&mut self, value: T) -> Option<T>;
87
88    /// Reads the entire request body into a byte buffer.
89    async fn read_body_bytes(&mut self) -> Result<Vec<u8>, Error>;
90
91    /// Returns the request path segments.
92    #[inline]
93    fn path_segments(&self) -> Vec<&str> {
94        self.request_path().trim_matches('/').split('/').collect()
95    }
96
97    /// Creates a new request context.
98    fn new_context(&self) -> Context {
99        // Emit metrics.
100        #[cfg(feature = "metrics")]
101        {
102            metrics::gauge!("zino_http_requests_in_flight").increment(1.0);
103            metrics::counter!(
104                "zino_http_requests_total",
105                "method" => self.request_method().as_ref().to_owned(),
106                "route" => self.matched_route().into_owned(),
107            )
108            .increment(1);
109        }
110
111        // Parse tracing headers.
112        let request_id = self
113            .get_header("x-request-id")
114            .and_then(|s| s.parse().ok())
115            .unwrap_or_else(Uuid::now_v7);
116        let trace_id = self
117            .get_trace_context()
118            .map_or_else(Uuid::now_v7, |t| Uuid::from_u128(t.trace_id()));
119        let session_id = self
120            .get_header("x-session-id")
121            .or_else(|| self.get_header("session_id"))
122            .and_then(|s| s.parse().ok());
123
124        // Generate new context.
125        let mut ctx = Context::new(request_id);
126        ctx.set_instance(self.request_path());
127        ctx.set_trace_id(trace_id);
128        ctx.set_session_id(session_id);
129
130        // Set locale.
131        #[cfg(feature = "i18n")]
132        {
133            #[cfg(feature = "cookie")]
134            if let Some(cookie) = self.get_cookie("locale") {
135                ctx.set_locale(cookie.value());
136                return ctx;
137            }
138
139            let supported_locales = i18n::SUPPORTED_LOCALES.as_slice();
140            let locale = self
141                .get_header("accept-language")
142                .and_then(|languages| helper::select_language(languages, supported_locales))
143                .unwrap_or(&i18n::DEFAULT_LOCALE);
144            ctx.set_locale(locale);
145        }
146        ctx
147    }
148
149    /// Returns the trace context by parsing the `traceparent` and `tracestate` header values.
150    #[inline]
151    fn get_trace_context(&self) -> Option<TraceContext> {
152        let traceparent = self.get_header("traceparent")?;
153        let mut trace_context = TraceContext::from_traceparent(traceparent)?;
154        if let Some(tracestate) = self.get_header("tracestate") {
155            *trace_context.trace_state_mut() = TraceState::from_tracestate(tracestate);
156        }
157        Some(trace_context)
158    }
159
160    /// Creates a new `TraceContext`.
161    fn new_trace_context(&self) -> TraceContext {
162        let mut trace_context = self
163            .get_trace_context()
164            .or_else(|| {
165                self.get_context()
166                    .map(|ctx| TraceContext::with_trace_id(ctx.trace_id()))
167            })
168            .map(|t| t.child())
169            .unwrap_or_default();
170        trace_context.record_trace_state();
171        trace_context
172    }
173
174    /// Creates a new cookie with the given name and value.
175    #[cfg(feature = "cookie")]
176    fn new_cookie(
177        &self,
178        name: SharedString,
179        value: SharedString,
180        max_age: Option<Duration>,
181    ) -> Cookie<'static> {
182        let mut cookie_builder = Cookie::build((name, value))
183            .http_only(true)
184            .secure(true)
185            .same_site(SameSite::Lax)
186            .path(self.request_path().to_owned());
187        if let Some(max_age) = max_age.and_then(|d| d.try_into().ok()) {
188            cookie_builder = cookie_builder.max_age(max_age);
189        }
190        cookie_builder.build()
191    }
192
193    /// Gets a cookie with the given name.
194    #[cfg(feature = "cookie")]
195    fn get_cookie(&self, name: &str) -> Option<Cookie<'_>> {
196        self.get_header("cookie")?.split(';').find_map(|cookie| {
197            if let Some((key, value)) = cookie.split_once('=') {
198                (key == name).then(|| Cookie::new(key, value))
199            } else {
200                None
201            }
202        })
203    }
204
205    /// Returns the start time.
206    #[inline]
207    fn start_time(&self) -> Instant {
208        self.get_context()
209            .map(|ctx| ctx.start_time())
210            .unwrap_or_else(Instant::now)
211    }
212
213    /// Returns the instance.
214    #[inline]
215    fn instance(&self) -> String {
216        self.get_context()
217            .map(|ctx| ctx.instance().to_owned())
218            .unwrap_or_else(|| self.request_path().to_owned())
219    }
220
221    /// Returns the request ID.
222    #[inline]
223    fn request_id(&self) -> Uuid {
224        self.get_context()
225            .map(|ctx| ctx.request_id())
226            .unwrap_or_default()
227    }
228
229    /// Returns the trace ID.
230    #[inline]
231    fn trace_id(&self) -> Uuid {
232        self.get_context()
233            .map(|ctx| ctx.trace_id())
234            .unwrap_or_default()
235    }
236
237    /// Returns the session ID.
238    #[inline]
239    fn session_id(&self) -> Option<String> {
240        self.get_context()
241            .and_then(|ctx| ctx.session_id().map(|s| s.to_owned()))
242    }
243
244    /// Returns the locale.
245    #[cfg(feature = "i18n")]
246    #[inline]
247    fn locale(&self) -> Option<LanguageIdentifier> {
248        self.get_context().and_then(|ctx| ctx.locale().cloned())
249    }
250
251    /// Gets the data type by parsing the `content-type` header.
252    ///
253    /// # Note
254    ///
255    /// Currently, we support the following values: `bytes` | `csv` | `form` | `json` | `multipart`
256    /// | `ndjson` | `text`.
257    fn data_type(&self) -> Option<&str> {
258        self.get_header("content-type")
259            .map(|content_type| {
260                if let Some((essence, _)) = content_type.split_once(';') {
261                    essence
262                } else {
263                    content_type
264                }
265            })
266            .map(helper::get_data_type)
267    }
268
269    /// Gets the route parameter by name.
270    /// The name should not include `:`, `*`, `{` or `}`.
271    ///
272    /// # Note
273    ///
274    /// Please note that it does not handle the percent-decoding.
275    /// You can use [`decode_param()`](Self::decode_param) or [`parse_param()`](Self::parse_param)
276    /// if you need percent-decoding.
277    fn get_param(&self, name: &str) -> Option<&str> {
278        const CAPTURES: [char; 4] = [':', '*', '{', '}'];
279        if let Some(index) = self
280            .matched_route()
281            .split('/')
282            .position(|segment| segment.trim_matches(CAPTURES.as_slice()) == name)
283        {
284            self.request_path().splitn(index + 2, '/').nth(index)
285        } else {
286            None
287        }
288    }
289
290    /// Decodes the UTF-8 percent-encoded route parameter by name.
291    fn decode_param(&self, name: &str) -> Result<Cow<'_, str>, Rejection> {
292        if let Some(value) = self.get_param(name) {
293            percent_encoding::percent_decode_str(value)
294                .decode_utf8()
295                .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
296        } else {
297            Err(Rejection::from_validation_entry(
298                name.to_owned(),
299                warn!("param `{}` does not exist", name),
300            )
301            .context(self))
302        }
303    }
304
305    /// Parses the route parameter by name as an instance of type `T`.
306    /// The name should not include `:`, `*`, `{` or `}`.
307    fn parse_param<T: FromStr<Err: Into<Error>>>(&self, name: &str) -> Result<T, Rejection> {
308        if let Some(param) = self.get_param(name) {
309            percent_encoding::percent_decode_str(param)
310                .decode_utf8_lossy()
311                .parse::<T>()
312                .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
313        } else {
314            Err(Rejection::from_validation_entry(
315                name.to_owned(),
316                warn!("param `{}` does not exist", name),
317            )
318            .context(self))
319        }
320    }
321
322    /// Gets the query value of the URI by name.
323    ///
324    /// # Note
325    ///
326    /// Please note that it does not handle the percent-decoding.
327    /// You can use [`decode_query()`](Self::decode_query) or [`parse_query()`](Self::parse_query)
328    /// if you need percent-decoding.
329    fn get_query(&self, name: &str) -> Option<&str> {
330        self.get_query_string()?.split('&').find_map(|param| {
331            if let Some((key, value)) = param.split_once('=') {
332                (key == name).then_some(value)
333            } else {
334                None
335            }
336        })
337    }
338
339    /// Decodes the UTF-8 percent-encoded query value of the URI by name.
340    fn decode_query(&self, name: &str) -> Result<Cow<'_, str>, Rejection> {
341        if let Some(value) = self.get_query(name) {
342            percent_encoding::percent_decode_str(value)
343                .decode_utf8()
344                .map_err(|err| Rejection::from_validation_entry(name.to_owned(), err).context(self))
345        } else {
346            Err(Rejection::from_validation_entry(
347                name.to_owned(),
348                warn!("query value `{}` does not exist", name),
349            )
350            .context(self))
351        }
352    }
353
354    /// Parses the query as an instance of type `T`.
355    /// Returns a default value of `T` when the query is empty.
356    /// If the query has a `timestamp` parameter, it will be used to prevent replay attacks.
357    fn parse_query<T: Default + DeserializeOwned>(&self) -> Result<T, Rejection> {
358        if let Some(query) = self.get_query_string() {
359            #[cfg(feature = "jwt")]
360            if let Some(timestamp) = self.get_query("timestamp").and_then(|s| s.parse().ok()) {
361                let duration = DateTime::from_timestamp(timestamp).span_between_now();
362                if duration > zino_auth::default_time_tolerance() {
363                    let err = warn!("timestamp `{}` can not be trusted", timestamp);
364                    let rejection = Rejection::from_validation_entry("timestamp", err);
365                    return Err(rejection.context(self));
366                }
367            }
368            serde_qs::from_str::<T>(query)
369                .map_err(|err| Rejection::from_validation_entry("query", err).context(self))
370        } else {
371            Ok(T::default())
372        }
373    }
374
375    /// Parses the request body as an instance of type `T`.
376    ///
377    /// # Note
378    ///
379    /// Currently, we have built-in support for the following `content-type` header values:
380    ///
381    /// - `application/json`
382    /// - `application/problem+json`
383    /// - `application/x-www-form-urlencoded`
384    async fn parse_body<T: DeserializeOwned>(&mut self) -> Result<T, Rejection> {
385        let data_type = self.data_type().unwrap_or("form");
386        if data_type.contains('/') {
387            let err = warn!(
388                "deserialization of the data type `{}` is unsupported",
389                data_type
390            );
391            let rejection = Rejection::from_validation_entry("data_type", err).context(self);
392            return Err(rejection);
393        }
394
395        let is_form = data_type == "form";
396        let bytes = self
397            .read_body_bytes()
398            .await
399            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
400        if is_form {
401            serde_qs::from_bytes(&bytes)
402                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
403        } else {
404            serde_json::from_slice(&bytes)
405                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
406        }
407    }
408
409    /// Parses the request body as a multipart, which is commonly used with file uploads.
410    async fn parse_multipart(&mut self) -> Result<Multipart, Rejection> {
411        let Some(content_type) = self.get_header("content-type") else {
412            return Err(Rejection::from_validation_entry(
413                "content_type",
414                warn!("invalid `content-type` header"),
415            )
416            .context(self));
417        };
418        match multer::parse_boundary(content_type) {
419            Ok(boundary) => {
420                let result = self.read_body_bytes().await.map_err(|err| err.to_string());
421                let stream = futures::stream::once(async { result });
422                Ok(Multipart::new(stream, boundary))
423            }
424            Err(err) => Err(Rejection::from_validation_entry("boundary", err).context(self)),
425        }
426    }
427
428    /// Parses the request body as a file.
429    async fn parse_file(&mut self) -> Result<NamedFile, Rejection> {
430        let multipart = self.parse_multipart().await?;
431        NamedFile::try_from_multipart(multipart)
432            .await
433            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
434    }
435
436    /// Parses the request body as a list of files.
437    async fn parse_files(&mut self) -> Result<Vec<NamedFile>, Rejection> {
438        let multipart = self.parse_multipart().await?;
439        NamedFile::try_collect_from_multipart(multipart)
440            .await
441            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
442    }
443
444    /// Parses the multipart form as an instance of `T` with the `name` and a list of files.
445    async fn parse_form<T: DeserializeOwned>(
446        &mut self,
447        name: &str,
448    ) -> Result<(Option<T>, Vec<NamedFile>), Rejection> {
449        let multipart = self.parse_multipart().await?;
450        helper::parse_form(multipart, name)
451            .await
452            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
453    }
454
455    /// Parses the `multipart/form-data` as an instance of type `T` and a list of files.
456    async fn parse_form_data<T: DeserializeOwned>(
457        &mut self,
458    ) -> Result<(T, Vec<NamedFile>), Rejection> {
459        let multipart = self.parse_multipart().await?;
460        helper::parse_form_data(multipart)
461            .await
462            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))
463    }
464
465    /// Attempts to construct an instance of `Authentication` from an HTTP request.
466    /// The value is extracted from the query or the `authorization` header.
467    /// By default, the `Accept` header value is ignored and
468    /// the canonicalized resource is set to the request path.
469    #[cfg(feature = "auth")]
470    fn parse_authentication(&self) -> Result<Authentication, Rejection> {
471        let method = self.request_method();
472        let query = self.parse_query::<Map>().unwrap_or_default();
473        let mut authentication = Authentication::new(method.as_ref());
474        let mut validation = Validation::new();
475        if let Some(signature) = query.get_str("signature") {
476            authentication.set_signature(signature.to_owned());
477            if let Some(access_key_id) = query.parse_string("access_key_id") {
478                authentication.set_access_key_id(access_key_id);
479            } else {
480                validation.record("access_key_id", "should be nonempty");
481            }
482            if let Some(Ok(secs)) = query.parse_i64("expires") {
483                if DateTime::now().timestamp() <= secs {
484                    let expires = DateTime::from_timestamp(secs);
485                    authentication.set_expires(Some(expires));
486                } else {
487                    validation.record("expires", "valid period has expired");
488                }
489            } else {
490                validation.record("expires", "invalid timestamp");
491            }
492            if !validation.is_success() {
493                return Err(Rejection::bad_request(validation).context(self));
494            }
495        } else if let Some(authorization) = self.get_header("authorization") {
496            if let Some((service_name, token)) = authorization.split_once(' ') {
497                authentication.set_service_name(service_name);
498                if let Some((access_key_id, signature)) = token.split_once(':') {
499                    authentication.set_access_key_id(access_key_id);
500                    authentication.set_signature(signature.to_owned());
501                } else {
502                    validation.record("authorization", "invalid header value");
503                }
504            } else {
505                validation.record("authorization", "invalid service name");
506            }
507            if !validation.is_success() {
508                return Err(Rejection::bad_request(validation).context(self));
509            }
510        }
511        if let Some(content_md5) = self.get_header("content-md5") {
512            authentication.set_content_md5(content_md5.to_owned());
513        }
514        if let Some(date) = self.get_header("date") {
515            match DateTime::parse_utc_str(date) {
516                Ok(date) => {
517                    #[cfg(feature = "jwt")]
518                    if date.span_between_now() <= zino_auth::default_time_tolerance() {
519                        authentication.set_date_header("date", date);
520                    } else {
521                        validation.record("date", "untrusted date");
522                    }
523                    #[cfg(not(feature = "jwt"))]
524                    authentication.set_date_header("date", date);
525                }
526                Err(err) => {
527                    validation.record_fail("date", err);
528                    return Err(Rejection::bad_request(validation).context(self));
529                }
530            }
531        }
532        authentication.set_content_type(self.get_header("content-type").map(|s| s.to_owned()));
533        authentication.set_resource(self.request_path().to_owned(), None);
534        Ok(authentication)
535    }
536
537    /// Attempts to construct an instance of `AccessKeyId` from an HTTP request.
538    /// The value is extracted from the query parameter `access_key_id`
539    /// or the `authorization` header.
540    #[cfg(feature = "auth")]
541    fn parse_access_key_id(&self) -> Result<AccessKeyId, Rejection> {
542        if let Some(access_key_id) = self.get_query("access_key_id") {
543            Ok(access_key_id.into())
544        } else {
545            let mut validation = Validation::new();
546            if let Some(authorization) = self.get_header("authorization") {
547                if let Some((_, token)) = authorization.split_once(' ') {
548                    let access_key_id = if let Some((access_key_id, _)) = token.split_once(':') {
549                        access_key_id
550                    } else {
551                        token
552                    };
553                    return Ok(access_key_id.into());
554                } else {
555                    validation.record("authorization", "invalid service name");
556                }
557            } else {
558                validation.record("authorization", "invalid value to get the access key id");
559            }
560            Err(Rejection::bad_request(validation).context(self))
561        }
562    }
563
564    /// Attempts to construct an instance of `SecurityToken` from an HTTP request.
565    /// The value is extracted from the `x-security-token` header.
566    #[cfg(feature = "auth")]
567    fn parse_security_token(&self, key: &[u8]) -> Result<SecurityToken, Rejection> {
568        use ParseSecurityTokenError::*;
569        let query = self.parse_query::<Map>()?;
570        let mut validation = Validation::new();
571        if let Some(token) = self
572            .get_header("x-security-token")
573            .or_else(|| query.get_str("security_token"))
574        {
575            match SecurityToken::parse_with(token.to_owned(), key) {
576                Ok(security_token) => {
577                    if let Some(access_key_id) = query.get_str("access_key_id") {
578                        if security_token.access_key_id().as_str() != access_key_id {
579                            validation.record("access_key_id", "untrusted access key ID");
580                        }
581                    }
582                    if let Some(Ok(expires)) = query.parse_i64("expires") {
583                        if security_token.expires_at().timestamp() != expires {
584                            validation.record("expires", "untrusted timestamp");
585                        }
586                    }
587                    if validation.is_success() {
588                        return Ok(security_token);
589                    }
590                }
591                Err(err) => {
592                    let field = match err {
593                        DecodeError(_) | InvalidFormat => "security_token",
594                        ParseExpiresError(_) | ValidPeriodExpired(_) => "expires",
595                    };
596                    validation.record_fail(field, err);
597                }
598            }
599        } else {
600            validation.record("security_token", "should be nonempty");
601        }
602        Err(Rejection::bad_request(validation).context(self))
603    }
604
605    /// Attempts to construct an instance of `SessionId` from an HTTP request.
606    /// The value is extracted from the `x-session-id` or `session-id` header.
607    #[cfg(feature = "auth")]
608    fn parse_session_id(&self) -> Result<SessionId, Rejection> {
609        self.get_header("x-session-id")
610            .or_else(|| self.get_header("session-id"))
611            .ok_or_else(|| {
612                Rejection::from_validation_entry(
613                    "session_id",
614                    warn!("a `session-id` or `x-session-id` header is required"),
615                )
616                .context(self)
617            })
618            .and_then(|session_id| {
619                SessionId::parse(session_id).map_err(|err| {
620                    Rejection::from_validation_entry("session_id", err).context(self)
621                })
622            })
623    }
624
625    /// Attempts to construct an instance of `JwtClaims` from an HTTP request.
626    /// The value is extracted from the query parameter `access_token` or
627    /// the `authorization` header.
628    #[cfg(feature = "jwt")]
629    fn parse_jwt_claims<T, K>(&self, key: &K) -> Result<JwtClaims<T>, Rejection>
630    where
631        T: Default + serde::Serialize + DeserializeOwned,
632        K: MACLike,
633    {
634        let (param, mut token) = match self.get_query("access_token") {
635            Some(access_token) => ("access_token", access_token),
636            None => ("authorization", ""),
637        };
638        if let Some(authorization) = self.get_header("authorization") {
639            token = authorization
640                .strip_prefix("Bearer ")
641                .unwrap_or(authorization);
642        }
643        if token.is_empty() {
644            let mut validation = Validation::new();
645            validation.record(param, "JWT token is absent");
646            return Err(Rejection::bad_request(validation).context(self));
647        }
648
649        let mut options = zino_auth::default_verification_options();
650        options.reject_before = self
651            .get_query("timestamp")
652            .and_then(|s| s.parse().ok())
653            .map(|i| Duration::from_secs(i).into());
654        options.required_nonce = self.get_query("nonce").map(|s| s.to_owned());
655
656        match key.verify_token(token, Some(options)) {
657            Ok(claims) => Ok(claims.into()),
658            Err(err) => {
659                let message = format!("401 Unauthorized: {err}");
660                Err(Rejection::with_message(message).context(self))
661            }
662        }
663    }
664
665    /// Returns a `Response` or `Rejection` from a model query validation.
666    /// The data is extracted from [`parse_query()`](RequestContext::parse_query).
667    fn query_validation<S>(&self, query: &mut Query) -> Result<Response<S>, Rejection>
668    where
669        Self: Sized,
670        S: ResponseCode,
671    {
672        match self.parse_query() {
673            Ok(data) => {
674                let validation = query.read_map(&data);
675                if validation.is_success() {
676                    Ok(Response::with_context(S::OK, self))
677                } else {
678                    Err(Rejection::bad_request(validation).context(self))
679                }
680            }
681            Err(rejection) => Err(rejection),
682        }
683    }
684
685    /// Returns a `Response` or `Rejection` from a model validation.
686    /// The data is extracted from [`parse_body()`](RequestContext::parse_body).
687    async fn model_validation<M, S>(&mut self, model: &mut M) -> Result<Response<S>, Rejection>
688    where
689        Self: Sized,
690        M: ModelHooks,
691        S: ResponseCode,
692    {
693        let data_type = self.data_type().unwrap_or("form");
694        if data_type.contains('/') {
695            let err = warn!(
696                "deserialization of the data type `{}` is unsupported",
697                data_type
698            );
699            let rejection = Rejection::from_validation_entry("data_type", err).context(self);
700            return Err(rejection);
701        }
702        M::before_extract()
703            .await
704            .map_err(|err| Rejection::from_error(err).context(self))?;
705
706        let is_form = data_type == "form";
707        let bytes = self
708            .read_body_bytes()
709            .await
710            .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
711        let extension = self.get_data::<M::Extension>();
712        if is_form {
713            let mut data = serde_qs::from_bytes(&bytes)
714                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
715            match M::before_validation(&mut data, extension.as_ref()).await {
716                Ok(()) => {
717                    let validation = model.read_map(&data);
718                    model
719                        .after_validation(&mut data)
720                        .await
721                        .map_err(|err| Rejection::from_error(err).context(self))?;
722                    if let Some(extension) = extension {
723                        model
724                            .after_extract(extension)
725                            .await
726                            .map_err(|err| Rejection::from_error(err).context(self))?;
727                    }
728                    if validation.is_success() {
729                        Ok(Response::with_context(S::OK, self))
730                    } else {
731                        Err(Rejection::bad_request(validation).context(self))
732                    }
733                }
734                Err(err) => Err(Rejection::from_error(err).context(self)),
735            }
736        } else {
737            let mut data = serde_json::from_slice(&bytes)
738                .map_err(|err| Rejection::from_validation_entry("body", err).context(self))?;
739            match M::before_validation(&mut data, extension.as_ref()).await {
740                Ok(()) => {
741                    let validation = model.read_map(&data);
742                    model
743                        .after_validation(&mut data)
744                        .await
745                        .map_err(|err| Rejection::from_error(err).context(self))?;
746                    if let Some(extension) = extension {
747                        model
748                            .after_extract(extension)
749                            .await
750                            .map_err(|err| Rejection::from_error(err).context(self))?;
751                    }
752                    if validation.is_success() {
753                        Ok(Response::with_context(S::OK, self))
754                    } else {
755                        Err(Rejection::bad_request(validation).context(self))
756                    }
757                }
758                Err(err) => Err(Rejection::from_error(err).context(self)),
759            }
760        }
761    }
762
763    /// Makes an HTTP request to the provided URL.
764    async fn fetch(&self, url: &str, options: Option<&Map>) -> Result<reqwest::Response, Error> {
765        let trace_context = self.new_trace_context();
766        Agent::request_builder(url, options)?
767            .header("traceparent", trace_context.traceparent())
768            .header("tracestate", trace_context.tracestate())
769            .send()
770            .await
771            .map_err(Error::from)
772    }
773
774    /// Makes an HTTP request to the provided URL and
775    /// deserializes the response body via JSON.
776    async fn fetch_json<T: DeserializeOwned>(
777        &self,
778        url: &str,
779        options: Option<&Map>,
780    ) -> Result<T, Error> {
781        let response = self.fetch(url, options).await?.error_for_status()?;
782        let data = if response.headers().has_json_content_type() {
783            response.json().await?
784        } else {
785            let text = response.text().await?;
786            serde_json::from_str(&text)?
787        };
788        Ok(data)
789    }
790
791    /// Translates the localization message.
792    #[cfg(feature = "i18n")]
793    fn translate(&self, message: &str, args: Option<FluentArgs>) -> Result<SharedString, Error> {
794        if let Some(locale) = self.locale() {
795            i18n::translate(&locale, message, args)
796        } else {
797            let default_locale = i18n::DEFAULT_LOCALE.parse()?;
798            i18n::translate(&default_locale, message, args)
799        }
800    }
801
802    /// Constructs a new subscription instance.
803    fn subscription(&self) -> Subscription {
804        let mut subscription = self.parse_query::<Subscription>().unwrap_or_default();
805        if subscription.session_id().is_none() {
806            if let Some(session_id) = self.session_id() {
807                subscription.set_session_id(Some(session_id));
808            }
809        }
810        subscription
811    }
812
813    /// Constructs a new cloud event instance.
814    fn cloud_event(&self, event_type: SharedString, data: JsonValue) -> CloudEvent {
815        let id = self.request_id();
816        let source = self.instance();
817        let mut event = CloudEvent::new(id, source, event_type);
818        if let Some(session_id) = self.session_id() {
819            event.set_session_id(session_id);
820        }
821        event.set_data(data);
822        event
823    }
824}