async_graphql/extensions/
opentelemetry.rs1use std::sync::Arc;
2
3use async_graphql_parser::types::ExecutableDocument;
4use async_graphql_value::Variables;
5use futures_util::{stream::BoxStream, TryFutureExt};
6use opentelemetry::{
7 trace::{FutureExt, SpanKind, TraceContextExt, Tracer},
8 Context as OpenTelemetryContext, Key, KeyValue,
9};
10
11use crate::{
12 extensions::{
13 Extension, ExtensionContext, ExtensionFactory, NextExecute, NextParseQuery, NextRequest,
14 NextResolve, NextSubscribe, NextValidation, ResolveInfo,
15 },
16 Response, ServerError, ServerResult, ValidationResult, Value,
17};
18
19const KEY_SOURCE: Key = Key::from_static_str("graphql.source");
20const KEY_VARIABLES: Key = Key::from_static_str("graphql.variables");
21const KEY_PARENT_TYPE: Key = Key::from_static_str("graphql.parentType");
22const KEY_RETURN_TYPE: Key = Key::from_static_str("graphql.returnType");
23const KEY_ERROR: Key = Key::from_static_str("graphql.error");
24const KEY_COMPLEXITY: Key = Key::from_static_str("graphql.complexity");
25const KEY_DEPTH: Key = Key::from_static_str("graphql.depth");
26
27#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
29pub struct OpenTelemetry<T> {
30 tracer: Arc<T>,
31}
32
33impl<T> OpenTelemetry<T> {
34 pub fn new(tracer: T) -> OpenTelemetry<T>
36 where
37 T: Tracer + Send + Sync + 'static,
38 <T as Tracer>::Span: Sync + Send,
39 {
40 Self {
41 tracer: Arc::new(tracer),
42 }
43 }
44}
45
46impl<T> ExtensionFactory for OpenTelemetry<T>
47where
48 T: Tracer + Send + Sync + 'static,
49 <T as Tracer>::Span: Sync + Send,
50{
51 fn create(&self) -> Arc<dyn Extension> {
52 Arc::new(OpenTelemetryExtension {
53 tracer: self.tracer.clone(),
54 })
55 }
56}
57
58struct OpenTelemetryExtension<T> {
59 tracer: Arc<T>,
60}
61
62#[async_trait::async_trait]
63impl<T> Extension for OpenTelemetryExtension<T>
64where
65 T: Tracer + Send + Sync + 'static,
66 <T as Tracer>::Span: Sync + Send,
67{
68 async fn request(&self, ctx: &ExtensionContext<'_>, next: NextRequest<'_>) -> Response {
69 next.run(ctx)
70 .with_context(OpenTelemetryContext::current_with_span(
71 self.tracer
72 .span_builder("request")
73 .with_kind(SpanKind::Server)
74 .start(&*self.tracer),
75 ))
76 .await
77 }
78
79 fn subscribe<'s>(
80 &self,
81 ctx: &ExtensionContext<'_>,
82 stream: BoxStream<'s, Response>,
83 next: NextSubscribe<'_>,
84 ) -> BoxStream<'s, Response> {
85 Box::pin(
86 next.run(ctx, stream)
87 .with_context(OpenTelemetryContext::current_with_span(
88 self.tracer
89 .span_builder("subscribe")
90 .with_kind(SpanKind::Server)
91 .start(&*self.tracer),
92 )),
93 )
94 }
95
96 async fn parse_query(
97 &self,
98 ctx: &ExtensionContext<'_>,
99 query: &str,
100 variables: &Variables,
101 next: NextParseQuery<'_>,
102 ) -> ServerResult<ExecutableDocument> {
103 let attributes = vec![
104 KeyValue::new(KEY_SOURCE, query.to_string()),
105 KeyValue::new(KEY_VARIABLES, serde_json::to_string(variables).unwrap()),
106 ];
107 let span = self
108 .tracer
109 .span_builder("parse")
110 .with_kind(SpanKind::Server)
111 .with_attributes(attributes)
112 .start(&*self.tracer);
113
114 async move {
115 let res = next.run(ctx, query, variables).await;
116 if let Ok(doc) = &res {
117 OpenTelemetryContext::current()
118 .span()
119 .set_attribute(KeyValue::new(
120 KEY_SOURCE,
121 ctx.stringify_execute_doc(doc, variables),
122 ));
123 }
124 res
125 }
126 .with_context(OpenTelemetryContext::current_with_span(span))
127 .await
128 }
129
130 async fn validation(
131 &self,
132 ctx: &ExtensionContext<'_>,
133 next: NextValidation<'_>,
134 ) -> Result<ValidationResult, Vec<ServerError>> {
135 let span = self
136 .tracer
137 .span_builder("validation")
138 .with_kind(SpanKind::Server)
139 .start(&*self.tracer);
140 next.run(ctx)
141 .with_context(OpenTelemetryContext::current_with_span(span))
142 .map_ok(|res| {
143 let current_cx = OpenTelemetryContext::current();
144 let span = current_cx.span();
145 span.set_attribute(KeyValue::new(KEY_COMPLEXITY, res.complexity as i64));
146 span.set_attribute(KeyValue::new(KEY_DEPTH, res.depth as i64));
147 res
148 })
149 .await
150 }
151
152 async fn execute(
153 &self,
154 ctx: &ExtensionContext<'_>,
155 operation_name: Option<&str>,
156 next: NextExecute<'_>,
157 ) -> Response {
158 let span = self
159 .tracer
160 .span_builder("execute")
161 .with_kind(SpanKind::Server)
162 .start(&*self.tracer);
163 next.run(ctx, operation_name)
164 .with_context(OpenTelemetryContext::current_with_span(span))
165 .await
166 }
167
168 async fn resolve(
169 &self,
170 ctx: &ExtensionContext<'_>,
171 info: ResolveInfo<'_>,
172 next: NextResolve<'_>,
173 ) -> ServerResult<Option<Value>> {
174 let span = if !info.is_for_introspection {
175 let attributes = vec![
176 KeyValue::new(KEY_PARENT_TYPE, info.parent_type.to_string()),
177 KeyValue::new(KEY_RETURN_TYPE, info.return_type.to_string()),
178 ];
179 Some(
180 self.tracer
181 .span_builder(info.path_node.to_string())
182 .with_kind(SpanKind::Server)
183 .with_attributes(attributes)
184 .start(&*self.tracer),
185 )
186 } else {
187 None
188 };
189
190 let fut = next.run(ctx, info).inspect_err(|err| {
191 let current_cx = OpenTelemetryContext::current();
192 current_cx.span().add_event(
193 "error".to_string(),
194 vec![KeyValue::new(KEY_ERROR, err.to_string())],
195 );
196 });
197
198 match span {
199 Some(span) => {
200 fut.with_context(OpenTelemetryContext::current_with_span(span))
201 .await
202 }
203 None => fut.await,
204 }
205 }
206}