opentelemetry_otlp/exporter/
mod.rs

1//! OTLP exporter builder and configurations.
2//!
3//! OTLP supports sending data via different protocols and formats.
4
5#[cfg(any(feature = "http-proto", feature = "http-json"))]
6use crate::exporter::http::HttpExporterBuilder;
7#[cfg(feature = "grpc-tonic")]
8use crate::exporter::tonic::TonicExporterBuilder;
9use crate::Protocol;
10#[cfg(feature = "serialize")]
11use serde::{Deserialize, Serialize};
12use std::fmt::{Display, Formatter};
13use std::str::FromStr;
14use std::time::Duration;
15use thiserror::Error;
16
17/// Target to which the exporter is going to send signals, defaults to https://localhost:4317.
18/// Learn about the relationship between this constant and metrics/spans/logs at
19/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md#endpoint-urls-for-otlphttp>
20pub const OTEL_EXPORTER_OTLP_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_ENDPOINT";
21/// Default target to which the exporter is going to send signals.
22pub const OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT: &str = OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT;
23/// Key-value pairs to be used as headers associated with gRPC or HTTP requests
24/// Example: `k1=v1,k2=v2`
25/// Note: as of now, this is only supported for HTTP requests.
26pub const OTEL_EXPORTER_OTLP_HEADERS: &str = "OTEL_EXPORTER_OTLP_HEADERS";
27/// Protocol the exporter will use. Either `http/protobuf` or `grpc`.
28pub const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL";
29/// Compression algorithm to use, defaults to none.
30pub const OTEL_EXPORTER_OTLP_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_COMPRESSION";
31
32#[cfg(feature = "http-json")]
33/// Default protocol, using http-json.
34pub const OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT: &str = OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_JSON;
35#[cfg(all(feature = "http-proto", not(feature = "http-json")))]
36/// Default protocol, using http-proto.
37pub const OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT: &str = OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF;
38#[cfg(all(
39    feature = "grpc-tonic",
40    not(any(feature = "http-proto", feature = "http-json"))
41))]
42/// Default protocol, using grpc
43pub const OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT: &str = OTEL_EXPORTER_OTLP_PROTOCOL_GRPC;
44
45#[cfg(not(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json")))]
46/// Default protocol if no features are enabled.
47pub const OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT: &str = "";
48
49const OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf";
50const OTEL_EXPORTER_OTLP_PROTOCOL_GRPC: &str = "grpc";
51const OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_JSON: &str = "http/json";
52
53/// Max waiting time for the backend to process each signal batch, defaults to 10 seconds.
54pub const OTEL_EXPORTER_OTLP_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_TIMEOUT";
55/// Default max waiting time for the backend to process each signal batch.
56pub const OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT: Duration = Duration::from_millis(10000);
57
58// Endpoints per protocol https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md
59#[cfg(feature = "grpc-tonic")]
60const OTEL_EXPORTER_OTLP_GRPC_ENDPOINT_DEFAULT: &str = "http://localhost:4317";
61const OTEL_EXPORTER_OTLP_HTTP_ENDPOINT_DEFAULT: &str = "http://localhost:4318";
62
63#[cfg(any(feature = "http-proto", feature = "http-json"))]
64pub(crate) mod http;
65#[cfg(feature = "grpc-tonic")]
66pub(crate) mod tonic;
67
68/// Configuration for the OTLP exporter.
69#[derive(Debug)]
70pub struct ExportConfig {
71    /// The address of the OTLP collector.
72    /// Default address will be used based on the protocol.
73    ///
74    /// Note: Programmatically setting this will override any value set via the environment variable.
75    pub endpoint: Option<String>,
76
77    /// The protocol to use when communicating with the collector.
78    pub protocol: Protocol,
79
80    /// The timeout to the collector.
81    /// The default value is 10 seconds.
82    ///
83    /// Note: Programmatically setting this will override any value set via the environment variable.
84    pub timeout: Option<Duration>,
85}
86
87impl Default for ExportConfig {
88    fn default() -> Self {
89        let protocol = default_protocol();
90
91        Self {
92            endpoint: None,
93            // don't use default_endpoint(protocol) here otherwise we
94            // won't know if user provided a value
95            protocol,
96            timeout: None,
97        }
98    }
99}
100
101#[derive(Error, Debug)]
102/// Errors that can occur while building an exporter.
103// TODO: Refine and polish this.
104// Non-exhaustive to allow for future expansion without breaking changes.
105// This could be refined after polishing and finalizing the errors.
106#[non_exhaustive]
107pub enum ExporterBuildError {
108    /// Spawning a new thread failed.
109    #[error("Spawning a new thread failed. Unable to create Reqwest-Blocking client.")]
110    ThreadSpawnFailed,
111
112    /// Feature required to use the specified compression algorithm.
113    #[cfg(any(not(feature = "gzip-tonic"), not(feature = "zstd-tonic")))]
114    #[error("feature '{0}' is required to use the compression algorithm '{1}'")]
115    FeatureRequiredForCompressionAlgorithm(&'static str, Compression),
116
117    /// No Http client specified.
118    #[error("no http client specified")]
119    NoHttpClient,
120
121    /// Unsupported compression algorithm.
122    #[error("unsupported compression algorithm '{0}'")]
123    UnsupportedCompressionAlgorithm(String),
124
125    /// Invalid URI.
126    #[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
127    #[error("invalid URI {0}. Reason {1}")]
128    InvalidUri(String, String),
129
130    /// Failed due to an internal error.
131    /// The error message is intended for logging purposes only and should not
132    /// be used to make programmatic decisions. It is implementation-specific
133    /// and subject to change without notice. Consumers of this error should not
134    /// rely on its content beyond logging.
135    #[error("Reason: {0}")]
136    InternalFailure(String),
137}
138
139/// The compression algorithm to use when sending data.
140#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
141#[derive(Clone, Copy, Debug, Eq, PartialEq)]
142pub enum Compression {
143    /// Compresses data using gzip.
144    Gzip,
145    /// Compresses data using zstd.
146    Zstd,
147}
148
149impl Display for Compression {
150    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
151        match self {
152            Compression::Gzip => write!(f, "gzip"),
153            Compression::Zstd => write!(f, "zstd"),
154        }
155    }
156}
157
158impl FromStr for Compression {
159    type Err = ExporterBuildError;
160
161    fn from_str(s: &str) -> Result<Self, Self::Err> {
162        match s {
163            "gzip" => Ok(Compression::Gzip),
164            "zstd" => Ok(Compression::Zstd),
165            _ => Err(ExporterBuildError::UnsupportedCompressionAlgorithm(
166                s.to_string(),
167            )),
168        }
169    }
170}
171
172/// default protocol based on enabled features
173fn default_protocol() -> Protocol {
174    match OTEL_EXPORTER_OTLP_PROTOCOL_DEFAULT {
175        OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF => Protocol::HttpBinary,
176        OTEL_EXPORTER_OTLP_PROTOCOL_GRPC => Protocol::Grpc,
177        OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_JSON => Protocol::HttpJson,
178        _ => Protocol::HttpBinary,
179    }
180}
181
182/// default user-agent headers
183#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
184fn default_headers() -> std::collections::HashMap<String, String> {
185    let mut headers = std::collections::HashMap::new();
186    headers.insert(
187        "User-Agent".to_string(),
188        format!("OTel OTLP Exporter Rust/{}", env!("CARGO_PKG_VERSION")),
189    );
190    headers
191}
192
193/// Provide access to the [ExportConfig] field within the exporter builders.
194pub trait HasExportConfig {
195    /// Return a mutable reference to the [ExportConfig] within the exporter builders.
196    fn export_config(&mut self) -> &mut ExportConfig;
197}
198
199/// Provide [ExportConfig] access to the [TonicExporterBuilder].
200#[cfg(feature = "grpc-tonic")]
201impl HasExportConfig for TonicExporterBuilder {
202    fn export_config(&mut self) -> &mut ExportConfig {
203        &mut self.exporter_config
204    }
205}
206
207/// Provide [ExportConfig] access to the [HttpExporterBuilder].
208#[cfg(any(feature = "http-proto", feature = "http-json"))]
209impl HasExportConfig for HttpExporterBuilder {
210    fn export_config(&mut self) -> &mut ExportConfig {
211        &mut self.exporter_config
212    }
213}
214
215/// Expose methods to override [ExportConfig].
216///
217/// This trait will be implemented for every struct that implemented [`HasExportConfig`] trait.
218///
219/// ## Examples
220/// ```
221/// # #[cfg(all(feature = "trace", feature = "grpc-tonic"))]
222/// # {
223/// use crate::opentelemetry_otlp::WithExportConfig;
224/// let exporter_builder = opentelemetry_otlp::SpanExporter::builder()
225///     .with_tonic()
226///     .with_endpoint("http://localhost:7201");
227/// # }
228/// ```
229pub trait WithExportConfig {
230    /// Set the address of the OTLP collector. If not set or set to empty string, the default address is used.
231    ///
232    /// Note: Programmatically setting this will override any value set via the environment variable.
233    fn with_endpoint<T: Into<String>>(self, endpoint: T) -> Self;
234    /// Set the protocol to use when communicating with the collector.
235    ///
236    /// Note that protocols that are not supported by exporters will be ignored. The exporter
237    /// will use default protocol in this case.
238    ///
239    /// ## Note
240    /// All exporters in this crate only support one protocol, thus choosing the protocol is a no-op at the moment.
241    fn with_protocol(self, protocol: Protocol) -> Self;
242    /// Set the timeout to the collector.
243    ///
244    /// Note: Programmatically setting this will override any value set via the environment variable.
245    fn with_timeout(self, timeout: Duration) -> Self;
246    /// Set export config. This will override all previous configurations.
247    ///
248    /// Note: Programmatically setting this will override any value set via environment variables.
249    fn with_export_config(self, export_config: ExportConfig) -> Self;
250}
251
252impl<B: HasExportConfig> WithExportConfig for B {
253    fn with_endpoint<T: Into<String>>(mut self, endpoint: T) -> Self {
254        self.export_config().endpoint = Some(endpoint.into());
255        self
256    }
257
258    fn with_protocol(mut self, protocol: Protocol) -> Self {
259        self.export_config().protocol = protocol;
260        self
261    }
262
263    fn with_timeout(mut self, timeout: Duration) -> Self {
264        self.export_config().timeout = Some(timeout);
265        self
266    }
267
268    fn with_export_config(mut self, exporter_config: ExportConfig) -> Self {
269        self.export_config().endpoint = exporter_config.endpoint;
270        self.export_config().protocol = exporter_config.protocol;
271        self.export_config().timeout = exporter_config.timeout;
272        self
273    }
274}
275
276#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
277fn resolve_timeout(signal_timeout_var: &str, provided_timeout: Option<&Duration>) -> Duration {
278    // programmatic configuration overrides any value set via environment variables
279    if let Some(timeout) = provided_timeout {
280        *timeout
281    } else if let Some(timeout) = std::env::var(signal_timeout_var)
282        .ok()
283        .and_then(|s| s.parse().ok())
284    {
285        // per signal env var is not modified
286        Duration::from_millis(timeout)
287    } else if let Some(timeout) = std::env::var(OTEL_EXPORTER_OTLP_TIMEOUT)
288        .ok()
289        .and_then(|s| s.parse().ok())
290    {
291        // if signal env var is not set, then we check if the OTEL_EXPORTER_OTLP_TIMEOUT env var is set
292        Duration::from_millis(timeout)
293    } else {
294        OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT
295    }
296}
297
298#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
299fn parse_header_string(value: &str) -> impl Iterator<Item = (&str, String)> {
300    value
301        .split_terminator(',')
302        .map(str::trim)
303        .filter_map(parse_header_key_value_string)
304}
305
306#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
307fn url_decode(value: &str) -> Option<String> {
308    let mut result = String::with_capacity(value.len());
309    let mut chars_to_decode = Vec::<u8>::new();
310    let mut all_chars = value.chars();
311
312    loop {
313        let ch = all_chars.next();
314
315        if ch.is_some() && ch.unwrap() == '%' {
316            chars_to_decode.push(
317                u8::from_str_radix(&format!("{}{}", all_chars.next()?, all_chars.next()?), 16)
318                    .ok()?,
319            );
320            continue;
321        }
322
323        if !chars_to_decode.is_empty() {
324            result.push_str(std::str::from_utf8(&chars_to_decode).ok()?);
325            chars_to_decode.clear();
326        }
327
328        if let Some(c) = ch {
329            result.push(c);
330        } else {
331            return Some(result);
332        }
333    }
334}
335
336#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
337fn parse_header_key_value_string(key_value_string: &str) -> Option<(&str, String)> {
338    key_value_string
339        .split_once('=')
340        .map(|(key, value)| {
341            (
342                key.trim(),
343                url_decode(value.trim()).unwrap_or(value.to_string()),
344            )
345        })
346        .filter(|(key, value)| !key.is_empty() && !value.is_empty())
347}
348
349#[cfg(test)]
350#[cfg(any(feature = "grpc-tonic", feature = "http-proto", feature = "http-json"))]
351mod tests {
352    pub(crate) fn run_env_test<T, F>(env_vars: T, f: F)
353    where
354        F: FnOnce(),
355        T: Into<Vec<(&'static str, &'static str)>>,
356    {
357        temp_env::with_vars(
358            env_vars
359                .into()
360                .iter()
361                .map(|&(k, v)| (k, Some(v)))
362                .collect::<Vec<(&'static str, Option<&'static str>)>>(),
363            f,
364        )
365    }
366
367    #[cfg(any(feature = "http-proto", feature = "http-json"))]
368    #[test]
369    fn test_default_http_endpoint() {
370        let exporter_builder = crate::HttpExporterBuilder::default();
371
372        assert_eq!(exporter_builder.exporter_config.endpoint, None);
373    }
374
375    #[cfg(feature = "logs")]
376    #[cfg(any(feature = "http-proto", feature = "http-json"))]
377    #[test]
378    fn export_builder_error_invalid_http_endpoint() {
379        use std::time::Duration;
380
381        use crate::{ExportConfig, LogExporter, Protocol, WithExportConfig};
382
383        let ex_config = ExportConfig {
384            endpoint: Some("invalid_uri/something".to_string()),
385            protocol: Protocol::HttpBinary,
386            timeout: Some(Duration::from_secs(10)),
387        };
388
389        let exporter_result = LogExporter::builder()
390            .with_http()
391            .with_export_config(ex_config)
392            .build();
393
394        assert!(
395            matches!(
396                exporter_result,
397                Err(crate::exporter::ExporterBuildError::InvalidUri(_, _))
398            ),
399            "Expected InvalidUri error, but got {:?}",
400            exporter_result
401        );
402    }
403
404    #[cfg(feature = "grpc-tonic")]
405    #[tokio::test]
406    async fn export_builder_error_invalid_grpc_endpoint() {
407        use std::time::Duration;
408
409        use crate::{ExportConfig, LogExporter, Protocol, WithExportConfig};
410
411        let ex_config = ExportConfig {
412            endpoint: Some("invalid_uri/something".to_string()),
413            protocol: Protocol::Grpc,
414            timeout: Some(Duration::from_secs(10)),
415        };
416
417        let exporter_result = LogExporter::builder()
418            .with_tonic()
419            .with_export_config(ex_config)
420            .build();
421
422        assert!(matches!(
423            exporter_result,
424            Err(crate::exporter::ExporterBuildError::InvalidUri(_, _))
425        ));
426    }
427
428    #[cfg(feature = "grpc-tonic")]
429    #[test]
430    fn test_default_tonic_endpoint() {
431        let exporter_builder = crate::TonicExporterBuilder::default();
432
433        assert_eq!(exporter_builder.exporter_config.endpoint, None);
434    }
435
436    #[test]
437    fn test_default_protocol() {
438        #[cfg(all(
439            feature = "http-json",
440            not(any(feature = "grpc-tonic", feature = "http-proto"))
441        ))]
442        {
443            assert_eq!(
444                crate::exporter::default_protocol(),
445                crate::Protocol::HttpJson
446            );
447        }
448
449        #[cfg(all(
450            feature = "http-proto",
451            not(any(feature = "grpc-tonic", feature = "http-json"))
452        ))]
453        {
454            assert_eq!(
455                crate::exporter::default_protocol(),
456                crate::Protocol::HttpBinary
457            );
458        }
459
460        #[cfg(all(
461            feature = "grpc-tonic",
462            not(any(feature = "http-proto", feature = "http-json"))
463        ))]
464        {
465            assert_eq!(crate::exporter::default_protocol(), crate::Protocol::Grpc);
466        }
467    }
468
469    #[test]
470    fn test_url_decode() {
471        let test_cases = vec![
472            // Format: (encoded, expected_decoded)
473            ("v%201", Some("v 1")),
474            ("v 1", Some("v 1")),
475            ("%C3%B6%C3%A0%C2%A7%C3%96abcd%C3%84", Some("öà§ÖabcdÄ")),
476            ("v%XX1", None),
477        ];
478
479        for (encoded, expected_decoded) in test_cases {
480            assert_eq!(
481                super::url_decode(encoded),
482                expected_decoded.map(|v| v.to_string()),
483            )
484        }
485    }
486
487    #[test]
488    fn test_parse_header_string() {
489        let test_cases = vec![
490            // Format: (input_str, expected_headers)
491            ("k1=v1", vec![("k1", "v1")]),
492            ("k1=v1,k2=v2", vec![("k1", "v1"), ("k2", "v2")]),
493            ("k1=v1=10,k2,k3", vec![("k1", "v1=10")]),
494            ("k1=v1,,,k2,k3=10", vec![("k1", "v1"), ("k3", "10")]),
495        ];
496
497        for (input_str, expected_headers) in test_cases {
498            assert_eq!(
499                super::parse_header_string(input_str).collect::<Vec<_>>(),
500                expected_headers
501                    .into_iter()
502                    .map(|(k, v)| (k, v.to_string()))
503                    .collect::<Vec<_>>(),
504            )
505        }
506    }
507
508    #[test]
509    fn test_parse_header_key_value_string() {
510        let test_cases = vec![
511            // Format: (input_str, expected_header)
512            ("k1=v1", Some(("k1", "v1"))),
513            (
514                "Authentication=Basic AAA",
515                Some(("Authentication", "Basic AAA")),
516            ),
517            (
518                "Authentication=Basic%20AAA",
519                Some(("Authentication", "Basic AAA")),
520            ),
521            ("k1=%XX", Some(("k1", "%XX"))),
522            ("", None),
523            ("=v1", None),
524            ("k1=", None),
525        ];
526
527        for (input_str, expected_headers) in test_cases {
528            assert_eq!(
529                super::parse_header_key_value_string(input_str),
530                expected_headers.map(|(k, v)| (k, v.to_string())),
531            )
532        }
533    }
534
535    #[test]
536    fn test_priority_of_signal_env_over_generic_env_for_timeout() {
537        run_env_test(
538            vec![
539                (crate::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, "3000"),
540                (super::OTEL_EXPORTER_OTLP_TIMEOUT, "2000"),
541            ],
542            || {
543                let timeout =
544                    super::resolve_timeout(crate::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, None);
545                assert_eq!(timeout.as_millis(), 3000);
546            },
547        );
548    }
549
550    #[test]
551    fn test_priority_of_code_based_config_over_envs_for_timeout() {
552        run_env_test(
553            vec![
554                (crate::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, "3000"),
555                (super::OTEL_EXPORTER_OTLP_TIMEOUT, "2000"),
556            ],
557            || {
558                let timeout = super::resolve_timeout(
559                    crate::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
560                    Some(&std::time::Duration::from_millis(1000)),
561                );
562                assert_eq!(timeout.as_millis(), 1000);
563            },
564        );
565    }
566
567    #[test]
568    fn test_use_default_when_others_missing_for_timeout() {
569        run_env_test(vec![], || {
570            let timeout = super::resolve_timeout(crate::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, None);
571            assert_eq!(timeout.as_millis(), 10_000);
572        });
573    }
574}