async_graphql/extensions/
opentelemetry.rs

1use std::sync::Arc;
2
3use async_graphql_parser::types::ExecutableDocument;
4use async_graphql_value::Variables;
5use futures_util::{stream::BoxStream, TryFutureExt};
6use opentelemetry::{
7    trace::{FutureExt, SpanKind, TraceContextExt, Tracer},
8    Context as OpenTelemetryContext, Key, KeyValue,
9};
10
11use crate::{
12    extensions::{
13        Extension, ExtensionContext, ExtensionFactory, NextExecute, NextParseQuery, NextRequest,
14        NextResolve, NextSubscribe, NextValidation, ResolveInfo,
15    },
16    Response, ServerError, ServerResult, ValidationResult, Value,
17};
18
19const KEY_SOURCE: Key = Key::from_static_str("graphql.source");
20const KEY_VARIABLES: Key = Key::from_static_str("graphql.variables");
21const KEY_PARENT_TYPE: Key = Key::from_static_str("graphql.parentType");
22const KEY_RETURN_TYPE: Key = Key::from_static_str("graphql.returnType");
23const KEY_ERROR: Key = Key::from_static_str("graphql.error");
24const KEY_COMPLEXITY: Key = Key::from_static_str("graphql.complexity");
25const KEY_DEPTH: Key = Key::from_static_str("graphql.depth");
26
27/// OpenTelemetry extension
28#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
29pub struct OpenTelemetry<T> {
30    tracer: Arc<T>,
31}
32
33impl<T> OpenTelemetry<T> {
34    /// Use `tracer` to create an OpenTelemetry extension.
35    pub fn new(tracer: T) -> OpenTelemetry<T>
36    where
37        T: Tracer + Send + Sync + 'static,
38        <T as Tracer>::Span: Sync + Send,
39    {
40        Self {
41            tracer: Arc::new(tracer),
42        }
43    }
44}
45
46impl<T> ExtensionFactory for OpenTelemetry<T>
47where
48    T: Tracer + Send + Sync + 'static,
49    <T as Tracer>::Span: Sync + Send,
50{
51    fn create(&self) -> Arc<dyn Extension> {
52        Arc::new(OpenTelemetryExtension {
53            tracer: self.tracer.clone(),
54        })
55    }
56}
57
58struct OpenTelemetryExtension<T> {
59    tracer: Arc<T>,
60}
61
62#[async_trait::async_trait]
63impl<T> Extension for OpenTelemetryExtension<T>
64where
65    T: Tracer + Send + Sync + 'static,
66    <T as Tracer>::Span: Sync + Send,
67{
68    async fn request(&self, ctx: &ExtensionContext<'_>, next: NextRequest<'_>) -> Response {
69        next.run(ctx)
70            .with_context(OpenTelemetryContext::current_with_span(
71                self.tracer
72                    .span_builder("request")
73                    .with_kind(SpanKind::Server)
74                    .start(&*self.tracer),
75            ))
76            .await
77    }
78
79    fn subscribe<'s>(
80        &self,
81        ctx: &ExtensionContext<'_>,
82        stream: BoxStream<'s, Response>,
83        next: NextSubscribe<'_>,
84    ) -> BoxStream<'s, Response> {
85        Box::pin(
86            next.run(ctx, stream)
87                .with_context(OpenTelemetryContext::current_with_span(
88                    self.tracer
89                        .span_builder("subscribe")
90                        .with_kind(SpanKind::Server)
91                        .start(&*self.tracer),
92                )),
93        )
94    }
95
96    async fn parse_query(
97        &self,
98        ctx: &ExtensionContext<'_>,
99        query: &str,
100        variables: &Variables,
101        next: NextParseQuery<'_>,
102    ) -> ServerResult<ExecutableDocument> {
103        let attributes = vec![
104            KeyValue::new(KEY_SOURCE, query.to_string()),
105            KeyValue::new(KEY_VARIABLES, serde_json::to_string(variables).unwrap()),
106        ];
107        let span = self
108            .tracer
109            .span_builder("parse")
110            .with_kind(SpanKind::Server)
111            .with_attributes(attributes)
112            .start(&*self.tracer);
113
114        async move {
115            let res = next.run(ctx, query, variables).await;
116            if let Ok(doc) = &res {
117                OpenTelemetryContext::current()
118                    .span()
119                    .set_attribute(KeyValue::new(
120                        KEY_SOURCE,
121                        ctx.stringify_execute_doc(doc, variables),
122                    ));
123            }
124            res
125        }
126        .with_context(OpenTelemetryContext::current_with_span(span))
127        .await
128    }
129
130    async fn validation(
131        &self,
132        ctx: &ExtensionContext<'_>,
133        next: NextValidation<'_>,
134    ) -> Result<ValidationResult, Vec<ServerError>> {
135        let span = self
136            .tracer
137            .span_builder("validation")
138            .with_kind(SpanKind::Server)
139            .start(&*self.tracer);
140        next.run(ctx)
141            .with_context(OpenTelemetryContext::current_with_span(span))
142            .map_ok(|res| {
143                let current_cx = OpenTelemetryContext::current();
144                let span = current_cx.span();
145                span.set_attribute(KeyValue::new(KEY_COMPLEXITY, res.complexity as i64));
146                span.set_attribute(KeyValue::new(KEY_DEPTH, res.depth as i64));
147                res
148            })
149            .await
150    }
151
152    async fn execute(
153        &self,
154        ctx: &ExtensionContext<'_>,
155        operation_name: Option<&str>,
156        next: NextExecute<'_>,
157    ) -> Response {
158        let span = self
159            .tracer
160            .span_builder("execute")
161            .with_kind(SpanKind::Server)
162            .start(&*self.tracer);
163        next.run(ctx, operation_name)
164            .with_context(OpenTelemetryContext::current_with_span(span))
165            .await
166    }
167
168    async fn resolve(
169        &self,
170        ctx: &ExtensionContext<'_>,
171        info: ResolveInfo<'_>,
172        next: NextResolve<'_>,
173    ) -> ServerResult<Option<Value>> {
174        let span = if !info.is_for_introspection {
175            let attributes = vec![
176                KeyValue::new(KEY_PARENT_TYPE, info.parent_type.to_string()),
177                KeyValue::new(KEY_RETURN_TYPE, info.return_type.to_string()),
178            ];
179            Some(
180                self.tracer
181                    .span_builder(info.path_node.to_string())
182                    .with_kind(SpanKind::Server)
183                    .with_attributes(attributes)
184                    .start(&*self.tracer),
185            )
186        } else {
187            None
188        };
189
190        let fut = next.run(ctx, info).inspect_err(|err| {
191            let current_cx = OpenTelemetryContext::current();
192            current_cx.span().add_event(
193                "error".to_string(),
194                vec![KeyValue::new(KEY_ERROR, err.to_string())],
195            );
196        });
197
198        match span {
199            Some(span) => {
200                fut.with_context(OpenTelemetryContext::current_with_span(span))
201                    .await
202            }
203            None => fut.await,
204        }
205    }
206}