1use std::collections::HashMap;
26use std::ffi::{CStr, CString};
27use std::iter::FromIterator;
28use std::os::raw::c_char;
29use std::ptr;
30
31use rdkafka_sys as rdsys;
32use rdkafka_sys::types::*;
33
34use crate::client::ClientContext;
35use crate::error::{IsError, KafkaError, KafkaResult};
36use crate::log::{log_enabled, DEBUG, INFO, WARN};
37use crate::util::{ErrBuf, KafkaDrop, NativePtr};
38
39#[derive(Copy, Clone, Debug)]
41pub enum RDKafkaLogLevel {
42 Emerg = 0,
45 Alert = 1,
48 Critical = 2,
51 Error = 3,
53 Warning = 4,
55 Notice = 5,
58 Info = 6,
60 Debug = 7,
62}
63
64impl RDKafkaLogLevel {
65 pub(crate) fn from_int(level: i32) -> RDKafkaLogLevel {
66 match level {
67 0 => RDKafkaLogLevel::Emerg,
68 1 => RDKafkaLogLevel::Alert,
69 2 => RDKafkaLogLevel::Critical,
70 3 => RDKafkaLogLevel::Error,
71 4 => RDKafkaLogLevel::Warning,
72 5 => RDKafkaLogLevel::Notice,
73 6 => RDKafkaLogLevel::Info,
74 _ => RDKafkaLogLevel::Debug,
75 }
76 }
77}
78
79pub struct NativeClientConfig {
85 ptr: NativePtr<RDKafkaConf>,
86}
87
88unsafe impl KafkaDrop for RDKafkaConf {
89 const TYPE: &'static str = "client config";
90 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_conf_destroy;
91}
92
93impl NativeClientConfig {
94 pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConf) -> NativeClientConfig {
96 NativeClientConfig {
97 ptr: NativePtr::from_ptr(ptr).unwrap(),
98 }
99 }
100
101 pub fn ptr(&self) -> *mut RDKafkaConf {
103 self.ptr.ptr()
104 }
105
106 pub fn get(&self, key: &str) -> KafkaResult<String> {
113 let make_err = |res| {
114 KafkaError::ClientConfig(
115 res,
116 match res {
117 RDKafkaConfRes::RD_KAFKA_CONF_UNKNOWN => "Unknown configuration name",
118 RDKafkaConfRes::RD_KAFKA_CONF_INVALID => "Invalid configuration value",
119 RDKafkaConfRes::RD_KAFKA_CONF_OK => "OK",
120 }
121 .into(),
122 key.into(),
123 "".into(),
124 )
125 };
126 let key_c = CString::new(key.to_string())?;
127
128 let mut size = 0_usize;
130 let res = unsafe {
131 rdsys::rd_kafka_conf_get(self.ptr(), key_c.as_ptr(), ptr::null_mut(), &mut size)
132 };
133 if res.is_error() {
134 return Err(make_err(res));
135 }
136
137 let mut buf = vec![0_u8; size];
140 let res = unsafe {
141 rdsys::rd_kafka_conf_get(
142 self.ptr(),
143 key_c.as_ptr(),
144 buf.as_mut_ptr() as *mut c_char,
145 &mut size,
146 )
147 };
148 if res.is_error() {
149 return Err(make_err(res));
150 }
151
152 Ok(CStr::from_bytes_with_nul(&buf)
154 .unwrap()
155 .to_string_lossy()
156 .into())
157 }
158}
159
160#[derive(Clone, Debug)]
162pub struct ClientConfig {
163 conf_map: HashMap<String, String>,
164 pub log_level: RDKafkaLogLevel,
167}
168
169impl Default for ClientConfig {
170 fn default() -> Self {
171 Self::new()
172 }
173}
174
175impl ClientConfig {
176 pub fn new() -> ClientConfig {
178 ClientConfig {
179 conf_map: HashMap::new(),
180 log_level: log_level_from_global_config(),
181 }
182 }
183
184 pub fn config_map(&self) -> &HashMap<String, String> {
186 &self.conf_map
187 }
188
189 pub fn get(&self, key: &str) -> Option<&str> {
199 self.conf_map.get(key).map(|val| val.as_str())
200 }
201
202 pub fn set<K, V>(&mut self, key: K, value: V) -> &mut ClientConfig
207 where
208 K: Into<String>,
209 V: Into<String>,
210 {
211 self.conf_map.insert(key.into(), value.into());
212 self
213 }
214
215 pub fn remove<'a>(&'a mut self, key: &str) -> &'a mut ClientConfig {
217 self.conf_map.remove(key);
218 self
219 }
220
221 pub fn set_log_level(&mut self, log_level: RDKafkaLogLevel) -> &mut ClientConfig {
224 self.log_level = log_level;
225 self
226 }
227
228 pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
230 let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
231 let mut err_buf = ErrBuf::new();
232 for (key, value) in &self.conf_map {
233 let key_c = CString::new(key.to_string())?;
234 let value_c = CString::new(value.to_string())?;
235 let ret = unsafe {
236 rdsys::rd_kafka_conf_set(
237 conf.ptr(),
238 key_c.as_ptr(),
239 value_c.as_ptr(),
240 err_buf.as_mut_ptr(),
241 err_buf.capacity(),
242 )
243 };
244 if ret.is_error() {
245 return Err(KafkaError::ClientConfig(
246 ret,
247 err_buf.to_string(),
248 key.to_string(),
249 value.to_string(),
250 ));
251 }
252 }
253 Ok(conf)
254 }
255
256 pub async fn create<T: FromClientConfig>(&self) -> KafkaResult<T> {
258 T::from_config(self).await
259 }
260
261 pub async fn create_with_context<C, T>(&self, context: C) -> KafkaResult<T>
263 where
264 C: ClientContext,
265 T: FromClientConfigAndContext<C>,
266 {
267 T::from_config_and_context(self, context).await
268 }
269}
270
271impl FromIterator<(String, String)> for ClientConfig {
272 fn from_iter<I>(iter: I) -> ClientConfig
273 where
274 I: IntoIterator<Item = (String, String)>,
275 {
276 let mut config = ClientConfig::new();
277 config.extend(iter);
278 config
279 }
280}
281
282impl Extend<(String, String)> for ClientConfig {
283 fn extend<I>(&mut self, iter: I)
284 where
285 I: IntoIterator<Item = (String, String)>,
286 {
287 self.conf_map.extend(iter)
288 }
289}
290
291fn log_level_from_global_config() -> RDKafkaLogLevel {
293 if log_enabled!(target: "librdkafka", DEBUG) {
294 RDKafkaLogLevel::Debug
295 } else if log_enabled!(target: "librdkafka", INFO) {
296 RDKafkaLogLevel::Info
297 } else if log_enabled!(target: "librdkafka", WARN) {
298 RDKafkaLogLevel::Warning
299 } else {
300 RDKafkaLogLevel::Error
301 }
302}
303
304#[async_trait::async_trait]
306pub trait FromClientConfig: Sized {
307 async fn from_config(_: &ClientConfig) -> KafkaResult<Self>;
310}
311
312#[async_trait::async_trait]
314pub trait FromClientConfigAndContext<C: ClientContext>: Sized {
315 async fn from_config_and_context(_: &ClientConfig, _: C) -> KafkaResult<Self>;
317}
318
319#[cfg(test)]
320mod tests {
321 use super::ClientConfig;
322
323 #[test]
324 fn test_client_config_set_map() {
325 let mut config: ClientConfig = vec![("a".into(), "1".into()), ("b".into(), "1".into())]
326 .into_iter()
327 .collect();
328 config.extend([("b".into(), "2".into()), ("c".into(), "3".into())]);
329
330 assert_eq!(config.get("a").unwrap(), "1");
331 assert_eq!(config.get("b").unwrap(), "2");
332 assert_eq!(config.get("c").unwrap(), "3");
333 }
334}