use std::fmt::{self, Debug};
use std::time::Duration;
#[cfg(feature = "grpc-tonic")]
use std::str::FromStr;
#[cfg(feature = "grpc-tonic")]
use {
crate::exporter::tonic::{resolve_compression, TonicConfig, TonicExporterBuilder},
opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient as TonicTraceServiceClient,
ExportTraceServiceRequest as TonicRequest,
},
tonic::{
metadata::{KeyAndValueRef, MetadataMap},
transport::Channel as TonicChannel,
Request,
},
};
#[cfg(feature = "grpc-sys")]
use {
crate::exporter::grpcio::{GrpcioConfig, GrpcioExporterBuilder},
grpcio::{
CallOption, Channel as GrpcChannel, ChannelBuilder, ChannelCredentialsBuilder, Environment,
MetadataBuilder,
},
opentelemetry_proto::grpcio::{
trace_service::ExportTraceServiceRequest as GrpcRequest,
trace_service_grpc::TraceServiceClient as GrpcioTraceServiceClient,
},
};
#[cfg(feature = "http-proto")]
use {
crate::exporter::http::{HttpConfig, HttpExporterBuilder},
http::{
header::{HeaderName, HeaderValue, CONTENT_TYPE},
Method, Uri,
},
opentelemetry_http::HttpClient,
opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest as ProstRequest,
prost::Message,
std::convert::TryFrom,
};
#[cfg(any(feature = "grpc-sys", feature = "http-proto"))]
use {std::collections::HashMap, std::sync::Arc};
use crate::exporter::ExportConfig;
use crate::OtlpPipeline;
use opentelemetry_api::{
global,
trace::{TraceError, TracerProvider},
};
use opentelemetry_sdk::{
self as sdk,
export::trace::{ExportResult, SpanData},
trace::BatchMessage,
};
use opentelemetry_semantic_conventions::SCHEMA_URL;
use async_trait::async_trait;
use futures_core::future::BoxFuture;
use sdk::runtime::RuntimeChannel;
pub const OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT";
pub const OTEL_EXPORTER_OTLP_TRACES_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_TRACES_TIMEOUT";
pub const OTEL_EXPORTER_OTLP_TRACES_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_TRACES_COMPRESSION";
impl OtlpPipeline {
pub fn tracing(self) -> OtlpTracePipeline {
OtlpTracePipeline::default()
}
}
#[derive(Default, Debug)]
pub struct OtlpTracePipeline {
exporter_builder: Option<SpanExporterBuilder>,
trace_config: Option<sdk::trace::Config>,
batch_config: Option<sdk::trace::BatchConfig>,
}
impl OtlpTracePipeline {
pub fn with_trace_config(mut self, trace_config: sdk::trace::Config) -> Self {
self.trace_config = Some(trace_config);
self
}
pub fn with_batch_config(mut self, batch_config: sdk::trace::BatchConfig) -> Self {
self.batch_config = Some(batch_config);
self
}
pub fn with_exporter<B: Into<SpanExporterBuilder>>(mut self, pipeline: B) -> Self {
self.exporter_builder = Some(pipeline.into());
self
}
pub fn install_simple(self) -> Result<sdk::trace::Tracer, TraceError> {
Ok(build_simple_with_exporter(
self.exporter_builder
.ok_or(crate::Error::NoExporterBuilder)?
.build_span_exporter()?,
self.trace_config,
))
}
pub fn install_batch<R: RuntimeChannel<BatchMessage>>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
Ok(build_batch_with_exporter(
self.exporter_builder
.ok_or(crate::Error::NoExporterBuilder)?
.build_span_exporter()?,
self.trace_config,
runtime,
self.batch_config,
))
}
}
fn build_simple_with_exporter(
exporter: SpanExporter,
trace_config: Option<sdk::trace::Config>,
) -> sdk::trace::Tracer {
let mut provider_builder = sdk::trace::TracerProvider::builder().with_simple_exporter(exporter);
if let Some(config) = trace_config {
provider_builder = provider_builder.with_config(config);
}
let provider = provider_builder.build();
let tracer = provider.versioned_tracer(
"opentelemetry-otlp",
Some(env!("CARGO_PKG_VERSION")),
Some(SCHEMA_URL),
None,
);
let _ = global::set_tracer_provider(provider);
tracer
}
fn build_batch_with_exporter<R: RuntimeChannel<BatchMessage>>(
exporter: SpanExporter,
trace_config: Option<sdk::trace::Config>,
runtime: R,
batch_config: Option<sdk::trace::BatchConfig>,
) -> sdk::trace::Tracer {
let mut provider_builder = sdk::trace::TracerProvider::builder();
let batch_processor = sdk::trace::BatchSpanProcessor::builder(exporter, runtime)
.with_batch_config(batch_config.unwrap_or_default())
.build();
provider_builder = provider_builder.with_span_processor(batch_processor);
if let Some(config) = trace_config {
provider_builder = provider_builder.with_config(config);
}
let provider = provider_builder.build();
let tracer = provider.versioned_tracer(
"opentelemetry-otlp",
Some(env!("CARGO_PKG_VERSION")),
Some(SCHEMA_URL),
None,
);
let _ = global::set_tracer_provider(provider);
tracer
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
#[non_exhaustive]
pub enum SpanExporterBuilder {
#[cfg(feature = "grpc-tonic")]
Tonic(TonicExporterBuilder),
#[cfg(feature = "grpc-sys")]
Grpcio(GrpcioExporterBuilder),
#[cfg(feature = "http-proto")]
Http(HttpExporterBuilder),
}
impl SpanExporterBuilder {
pub fn build_span_exporter(self) -> Result<SpanExporter, TraceError> {
match self {
#[cfg(feature = "grpc-tonic")]
SpanExporterBuilder::Tonic(builder) => Ok(match builder.channel {
Some(channel) => SpanExporter::from_tonic_channel(
builder.exporter_config,
builder.tonic_config,
channel,
),
None => SpanExporter::new_tonic(builder.exporter_config, builder.tonic_config),
}?),
#[cfg(feature = "grpc-sys")]
SpanExporterBuilder::Grpcio(builder) => Ok(SpanExporter::new_grpcio(
builder.exporter_config,
builder.grpcio_config,
)),
#[cfg(feature = "http-proto")]
SpanExporterBuilder::Http(builder) => Ok(SpanExporter::new_http(
builder.exporter_config,
builder.http_config,
)?),
}
}
}
#[cfg(feature = "grpc-tonic")]
impl From<TonicExporterBuilder> for SpanExporterBuilder {
fn from(exporter: TonicExporterBuilder) -> Self {
SpanExporterBuilder::Tonic(exporter)
}
}
#[cfg(feature = "grpc-sys")]
impl From<GrpcioExporterBuilder> for SpanExporterBuilder {
fn from(exporter: GrpcioExporterBuilder) -> Self {
SpanExporterBuilder::Grpcio(exporter)
}
}
#[cfg(feature = "http-proto")]
impl From<HttpExporterBuilder> for SpanExporterBuilder {
fn from(exporter: HttpExporterBuilder) -> Self {
SpanExporterBuilder::Http(exporter)
}
}
pub enum SpanExporter {
#[cfg(feature = "grpc-tonic")]
Tonic {
timeout: Duration,
metadata: Option<MetadataMap>,
trace_exporter: TonicTraceServiceClient<TonicChannel>,
},
#[cfg(feature = "grpc-sys")]
Grpcio {
timeout: Duration,
headers: Option<HashMap<String, String>>,
trace_exporter: GrpcioTraceServiceClient,
},
#[cfg(feature = "http-proto")]
Http {
timeout: Duration,
headers: Option<HashMap<String, String>>,
collector_endpoint: Uri,
trace_exporter: Option<Arc<dyn HttpClient>>,
},
}
impl Debug for SpanExporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
#[cfg(feature = "grpc-tonic")]
SpanExporter::Tonic {
metadata, timeout, ..
} => f
.debug_struct("Exporter")
.field("metadata", &metadata)
.field("timeout", &timeout)
.field("trace_exporter", &"TraceServiceClient")
.finish(),
#[cfg(feature = "grpc-sys")]
SpanExporter::Grpcio {
headers, timeout, ..
} => f
.debug_struct("Exporter")
.field("headers", &headers)
.field("timeout", &timeout)
.field("trace_exporter", &"TraceServiceClient")
.finish(),
#[cfg(feature = "http-proto")]
SpanExporter::Http {
headers, timeout, ..
} => f
.debug_struct("Exporter")
.field("headers", &headers)
.field("timeout", &timeout)
.field("trace_exporter", &"TraceServiceClient")
.finish(),
}
}
}
impl SpanExporter {
#[cfg(feature = "grpc-tonic")]
pub fn new_tonic(
config: ExportConfig,
tonic_config: TonicConfig,
) -> Result<Self, crate::Error> {
let endpoint_str = match std::env::var(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) {
Ok(val) => val,
Err(_) => format!("{}{}", config.endpoint, "/v1/traces"),
};
let endpoint = TonicChannel::from_shared(endpoint_str)?;
let _timeout = match std::env::var(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT) {
Ok(val) => match u64::from_str(&val) {
Ok(seconds) => Duration::from_secs(seconds),
Err(_) => config.timeout,
},
Err(_) => config.timeout,
};
#[cfg(feature = "tls")]
let channel = match tonic_config.tls_config.as_ref() {
Some(tls_config) => endpoint.tls_config(tls_config.clone())?,
None => endpoint,
}
.timeout(_timeout)
.connect_lazy();
#[cfg(not(feature = "tls"))]
let channel = endpoint.timeout(_timeout).connect_lazy();
SpanExporter::from_tonic_channel(config, tonic_config, channel)
}
#[cfg(feature = "grpc-tonic")]
pub fn from_tonic_channel(
config: ExportConfig,
tonic_config: TonicConfig,
channel: tonic::transport::Channel,
) -> Result<Self, crate::Error> {
let mut trace_exporter = TonicTraceServiceClient::new(channel);
if let Some(compression) =
resolve_compression(&tonic_config, OTEL_EXPORTER_OTLP_TRACES_COMPRESSION)?
{
trace_exporter = trace_exporter.send_compressed(compression)
}
Ok(SpanExporter::Tonic {
timeout: config.timeout,
metadata: tonic_config.metadata,
trace_exporter,
})
}
#[cfg(feature = "grpc-sys")]
pub fn new_grpcio(config: ExportConfig, grpcio_config: GrpcioConfig) -> Self {
let mut builder: ChannelBuilder = ChannelBuilder::new(Arc::new(Environment::new(
grpcio_config.completion_queue_count,
)));
if let Some(compression) = grpcio_config.compression {
builder = builder.default_compression_algorithm(compression.into());
}
let channel: GrpcChannel = match (grpcio_config.credentials, grpcio_config.use_tls) {
(None, Some(true)) => builder
.set_credentials(ChannelCredentialsBuilder::new().build())
.connect(config.endpoint.as_str()),
(None, _) => builder.connect(config.endpoint.as_str()),
(Some(credentials), _) => builder
.set_credentials(
ChannelCredentialsBuilder::new()
.cert(credentials.cert.into(), credentials.key.into())
.build(),
)
.connect(config.endpoint.as_str()),
};
SpanExporter::Grpcio {
trace_exporter: GrpcioTraceServiceClient::new(channel),
timeout: config.timeout,
headers: grpcio_config.headers,
}
}
#[cfg(feature = "http-proto")]
pub fn new_http(config: ExportConfig, http_config: HttpConfig) -> Result<Self, crate::Error> {
let url: Uri = config
.endpoint
.parse()
.map_err::<crate::Error, _>(Into::into)?;
Ok(SpanExporter::Http {
trace_exporter: http_config.client,
timeout: config.timeout,
collector_endpoint: url,
headers: http_config.headers,
})
}
}
#[cfg(feature = "grpc-sys")]
async fn grpcio_send_request(
trace_exporter: GrpcioTraceServiceClient,
request: GrpcRequest,
call_options: CallOption,
) -> ExportResult {
let receiver = trace_exporter
.export_async_opt(&request, call_options)
.map_err::<crate::Error, _>(Into::into)?;
receiver.await.map_err::<crate::Error, _>(Into::into)?;
Ok(())
}
#[cfg(feature = "tonic")]
async fn tonic_send_request(
trace_exporter: TonicTraceServiceClient<TonicChannel>,
request: Request<TonicRequest>,
) -> ExportResult {
trace_exporter
.to_owned()
.export(request)
.await
.map_err::<crate::Error, _>(Into::into)?;
Ok(())
}
#[cfg(feature = "http-proto")]
async fn http_send_request(
batch: Vec<SpanData>,
client: std::sync::Arc<dyn HttpClient>,
headers: Option<HashMap<String, String>>,
collector_endpoint: Uri,
) -> ExportResult {
use opentelemetry_http::ResponseExt;
let req = ProstRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
};
let mut buf = vec![];
req.encode(&mut buf)
.map_err::<crate::Error, _>(Into::into)?;
let mut request = http::Request::builder()
.method(Method::POST)
.uri(collector_endpoint)
.header(CONTENT_TYPE, "application/x-protobuf")
.body(buf)
.map_err::<crate::Error, _>(Into::into)?;
if let Some(headers) = headers {
for (k, val) in headers {
let value =
HeaderValue::from_str(val.as_ref()).map_err::<crate::Error, _>(Into::into)?;
let key = HeaderName::try_from(&k).map_err::<crate::Error, _>(Into::into)?;
request.headers_mut().insert(key, value);
}
}
client.send(request).await?.error_for_status()?;
Ok(())
}
#[async_trait]
impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
match self {
#[cfg(feature = "grpc-sys")]
SpanExporter::Grpcio {
timeout,
headers,
trace_exporter,
} => {
let request = GrpcRequest {
resource_spans: protobuf::RepeatedField::from_vec(
batch.into_iter().map(Into::into).collect(),
),
unknown_fields: Default::default(),
cached_size: Default::default(),
};
let mut call_options = CallOption::default().timeout(*timeout);
if let Some(headers) = headers.clone() {
let mut metadata_builder: MetadataBuilder = MetadataBuilder::new();
for (key, value) in headers {
let _ = metadata_builder.add_str(key.as_str(), value.as_str());
}
call_options = call_options.headers(metadata_builder.build());
}
Box::pin(grpcio_send_request(
trace_exporter.clone(),
request,
call_options,
))
}
#[cfg(feature = "grpc-tonic")]
SpanExporter::Tonic {
trace_exporter,
metadata,
..
} => {
let mut request = Request::new(TonicRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
});
if let Some(metadata) = metadata {
for key_and_value in metadata.iter() {
match key_and_value {
KeyAndValueRef::Ascii(key, value) => {
request.metadata_mut().append(key, value.to_owned())
}
KeyAndValueRef::Binary(key, value) => {
request.metadata_mut().append_bin(key, value.to_owned())
}
};
}
}
Box::pin(tonic_send_request(trace_exporter.to_owned(), request))
}
#[cfg(feature = "http-proto")]
SpanExporter::Http {
trace_exporter,
collector_endpoint,
headers,
..
} => {
if let Some(ref client) = trace_exporter {
let client = Arc::clone(client);
Box::pin(http_send_request(
batch,
client,
headers.clone(),
collector_endpoint.clone(),
))
} else {
Box::pin(std::future::ready(Err(crate::Error::NoHttpClient.into())))
}
}
}
}
}