use std::collections::HashMap;
use std::sync::Arc;
use grpcio::{Channel, ChannelBuilder, ChannelCredentialsBuilder, Environment};
#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};
use crate::exporter::Compression;
use crate::ExportConfig;
use super::default_headers;
#[cfg(feature = "logs")]
mod logs;
#[cfg(feature = "trace")]
mod trace;
#[cfg(feature = "metrics")]
mod metrics;
#[derive(Debug)]
#[non_exhaustive]
pub struct GrpcioConfig {
pub credentials: Option<Credentials>,
pub headers: Option<HashMap<String, String>>,
pub compression: Option<Compression>,
pub use_tls: Option<bool>,
pub completion_queue_count: usize,
}
impl Default for GrpcioConfig {
fn default() -> Self {
GrpcioConfig {
credentials: None,
headers: Some(default_headers()),
compression: None,
use_tls: None,
completion_queue_count: 2,
}
}
}
#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
#[derive(Clone, Debug)]
pub struct Credentials {
pub cert: String,
pub key: String,
}
impl From<Compression> for grpcio::CompressionAlgorithms {
fn from(compression: Compression) -> Self {
match compression {
Compression::Gzip => grpcio::CompressionAlgorithms::GRPC_COMPRESS_GZIP,
}
}
}
#[derive(Default, Debug)]
pub struct GrpcioExporterBuilder {
pub(crate) exporter_config: ExportConfig,
pub(crate) grpcio_config: GrpcioConfig,
}
impl GrpcioExporterBuilder {
pub fn with_credentials(mut self, credentials: Credentials) -> Self {
self.grpcio_config.credentials = Some(credentials);
self
}
pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
let mut inst_headers = self.grpcio_config.headers.unwrap_or_default();
inst_headers.extend(headers);
self.grpcio_config.headers = Some(inst_headers);
self
}
pub fn with_compression(mut self, compression: Compression) -> Self {
self.grpcio_config.compression = Some(compression);
self
}
pub fn with_tls(mut self, use_tls: bool) -> Self {
self.grpcio_config.use_tls = Some(use_tls);
self
}
pub fn with_completion_queue_count(mut self, count: usize) -> Self {
self.grpcio_config.completion_queue_count = count;
self
}
fn build_channel(&mut self) -> Result<Channel, crate::Error> {
let mut builder: ChannelBuilder = ChannelBuilder::new(Arc::new(Environment::new(
self.grpcio_config.completion_queue_count,
)));
if let Some(compression) = self.grpcio_config.compression {
builder = builder.default_compression_algorithm(compression.into());
}
let channel = match (
self.grpcio_config.credentials.take(),
self.grpcio_config.use_tls.take(),
) {
(None, Some(true)) => builder
.set_credentials(ChannelCredentialsBuilder::new().build())
.connect(self.exporter_config.endpoint.as_str()),
(None, _) => builder.connect(self.exporter_config.endpoint.as_str()),
(Some(credentials), _) => builder
.set_credentials(
ChannelCredentialsBuilder::new()
.cert(credentials.cert.into(), credentials.key.into())
.build(),
)
.connect(self.exporter_config.endpoint.as_str()),
};
Ok(channel)
}
#[cfg(feature = "trace")]
pub fn build_span_exporter(
mut self,
) -> Result<crate::SpanExporter, opentelemetry::trace::TraceError> {
use opentelemetry_proto::grpcio::collector::trace::v1::TraceServiceClient;
use self::trace::GrpcioTraceClient;
let channel = self.build_channel()?;
let client = GrpcioTraceClient::new(
TraceServiceClient::new(channel),
self.exporter_config.timeout,
self.grpcio_config.headers.unwrap_or_default(),
);
Ok(crate::SpanExporter::new(client))
}
#[cfg(feature = "logs")]
pub fn build_log_exporter(
mut self,
) -> Result<crate::logs::LogExporter, opentelemetry::logs::LogError> {
use self::logs::GrpcioLogsClient;
use opentelemetry_proto::grpcio::collector::logs::v1::LogsServiceClient;
let channel = self.build_channel()?;
let client = GrpcioLogsClient::new(
LogsServiceClient::new(channel),
self.exporter_config.timeout,
self.grpcio_config.headers.unwrap_or_default(),
);
Ok(crate::logs::LogExporter::new(client))
}
#[cfg(feature = "metrics")]
pub fn build_metrics_exporter(
mut self,
aggregation_selector: Box<dyn opentelemetry_sdk::metrics::reader::AggregationSelector>,
temporality_selector: Box<dyn opentelemetry_sdk::metrics::reader::TemporalitySelector>,
) -> opentelemetry::metrics::Result<crate::MetricsExporter> {
use self::metrics::GrpcioMetricsClient;
use opentelemetry_proto::grpcio::collector::metrics::v1::MetricsServiceClient;
let channel = self.build_channel()?;
let client = GrpcioMetricsClient::new(
MetricsServiceClient::new(channel),
self.exporter_config.timeout,
self.grpcio_config.headers.unwrap_or_default(),
);
Ok(crate::MetricsExporter::new(
client,
temporality_selector,
aggregation_selector,
))
}
}
#[cfg(test)]
mod tests {
use crate::GrpcioExporterBuilder;
use std::collections::HashMap;
#[test]
fn test_with_headers() {
let mut headers = HashMap::new();
headers.insert("key".to_string(), "value".to_string());
let builder = GrpcioExporterBuilder::default().with_headers(headers);
let result = builder.grpcio_config.headers.unwrap();
assert_eq!(result.get("key").unwrap(), "value");
assert!(result.get("User-Agent").is_some());
let mut headers = HashMap::new();
headers.insert("User-Agent".to_string(), "baz".to_string());
let builder = GrpcioExporterBuilder::default().with_headers(headers);
let result = builder.grpcio_config.headers.unwrap();
assert_eq!(result.get("User-Agent").unwrap(), "baz");
assert_eq!(
result.len(),
GrpcioExporterBuilder::default()
.grpcio_config
.headers
.unwrap()
.len()
);
}
}