madsim_rdkafka/std/
config.rs

1//! Producer and consumer configuration.
2//!
3//! ## C library configuration
4//!
5//! The Rust library will forward all the configuration to the C library. The
6//! most frequently used parameters are listed here.
7//!
8//! ### Frequently used parameters
9//!
10//! For producer-specific and consumer-specific parameters check the producer
11//! and consumer modules documentation. The full list of available parameters is
12//! available in the [librdkafka documentation][librdkafka-config].
13//!
14//! - `client.id`: Client identifier. Default: `rdkafka`.
15//! - `bootstrap.servers`: Initial list of brokers as a CSV list of broker host
16//!    or host:port. Default: empty.
17//! - `message.max.bytes`: Maximum message size. Default: 1000000.
18//! - `debug`: A comma-separated list of debug contexts to enable. Use 'all' to
19//!    print all the debugging information. Default: empty (off).
20//! - `statistics.interval.ms`: how often the statistic callback
21//!    specified in the [`ClientContext`] will be called. Default: 0 (disabled).
22//!
23//! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
24
25use std::collections::HashMap;
26use std::ffi::{CStr, CString};
27use std::iter::FromIterator;
28use std::os::raw::c_char;
29use std::ptr;
30
31use rdkafka_sys as rdsys;
32use rdkafka_sys::types::*;
33
34use crate::client::ClientContext;
35use crate::error::{IsError, KafkaError, KafkaResult};
36use crate::log::{log_enabled, DEBUG, INFO, WARN};
37use crate::util::{ErrBuf, KafkaDrop, NativePtr};
38
39/// The log levels supported by librdkafka.
40#[derive(Copy, Clone, Debug)]
41pub enum RDKafkaLogLevel {
42    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
43    /// crate.
44    Emerg = 0,
45    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
46    /// crate.
47    Alert = 1,
48    /// Higher priority then [`Level::Error`](log::Level::Error) from the log
49    /// crate.
50    Critical = 2,
51    /// Equivalent to [`Level::Error`](log::Level::Error) from the log crate.
52    Error = 3,
53    /// Equivalent to [`Level::Warn`](log::Level::Warn) from the log crate.
54    Warning = 4,
55    /// Higher priority then [`Level::Info`](log::Level::Info) from the log
56    /// crate.
57    Notice = 5,
58    /// Equivalent to [`Level::Info`](log::Level::Info) from the log crate.
59    Info = 6,
60    /// Equivalent to [`Level::Debug`](log::Level::Debug) from the log crate.
61    Debug = 7,
62}
63
64impl RDKafkaLogLevel {
65    pub(crate) fn from_int(level: i32) -> RDKafkaLogLevel {
66        match level {
67            0 => RDKafkaLogLevel::Emerg,
68            1 => RDKafkaLogLevel::Alert,
69            2 => RDKafkaLogLevel::Critical,
70            3 => RDKafkaLogLevel::Error,
71            4 => RDKafkaLogLevel::Warning,
72            5 => RDKafkaLogLevel::Notice,
73            6 => RDKafkaLogLevel::Info,
74            _ => RDKafkaLogLevel::Debug,
75        }
76    }
77}
78
79//
80// ********** CLIENT CONFIG **********
81//
82
83/// A native rdkafka-sys client config.
84pub struct NativeClientConfig {
85    ptr: NativePtr<RDKafkaConf>,
86}
87
88unsafe impl KafkaDrop for RDKafkaConf {
89    const TYPE: &'static str = "client config";
90    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_conf_destroy;
91}
92
93impl NativeClientConfig {
94    /// Wraps a pointer to an `RDKafkaConfig` object and returns a new `NativeClientConfig`.
95    pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaConf) -> NativeClientConfig {
96        NativeClientConfig {
97            ptr: NativePtr::from_ptr(ptr).unwrap(),
98        }
99    }
100
101    /// Returns the pointer to the librdkafka RDKafkaConf structure.
102    pub fn ptr(&self) -> *mut RDKafkaConf {
103        self.ptr.ptr()
104    }
105
106    /// Gets the value of a parameter in the configuration.
107    ///
108    /// This method reflects librdkafka's view of the current value of the
109    /// parameter. If the parameter was overridden by the user, it returns the
110    /// user-specified value. Otherwise, it returns librdkafka's default value
111    /// for the parameter.
112    pub fn get(&self, key: &str) -> KafkaResult<String> {
113        let make_err = |res| {
114            KafkaError::ClientConfig(
115                res,
116                match res {
117                    RDKafkaConfRes::RD_KAFKA_CONF_UNKNOWN => "Unknown configuration name",
118                    RDKafkaConfRes::RD_KAFKA_CONF_INVALID => "Invalid configuration value",
119                    RDKafkaConfRes::RD_KAFKA_CONF_OK => "OK",
120                }
121                .into(),
122                key.into(),
123                "".into(),
124            )
125        };
126        let key_c = CString::new(key.to_string())?;
127
128        // Call with a `NULL` buffer to determine the size of the string.
129        let mut size = 0_usize;
130        let res = unsafe {
131            rdsys::rd_kafka_conf_get(self.ptr(), key_c.as_ptr(), ptr::null_mut(), &mut size)
132        };
133        if res.is_error() {
134            return Err(make_err(res));
135        }
136
137        // Allocate a buffer of that size and call again to get the actual
138        // string.
139        let mut buf = vec![0_u8; size];
140        let res = unsafe {
141            rdsys::rd_kafka_conf_get(
142                self.ptr(),
143                key_c.as_ptr(),
144                buf.as_mut_ptr() as *mut c_char,
145                &mut size,
146            )
147        };
148        if res.is_error() {
149            return Err(make_err(res));
150        }
151
152        // Convert the C string to a Rust string.
153        Ok(CStr::from_bytes_with_nul(&buf)
154            .unwrap()
155            .to_string_lossy()
156            .into())
157    }
158}
159
160/// Client configuration.
161#[derive(Clone, Debug)]
162pub struct ClientConfig {
163    conf_map: HashMap<String, String>,
164    /// The librdkafka logging level. Refer to [`RDKafkaLogLevel`] for the list
165    /// of available levels.
166    pub log_level: RDKafkaLogLevel,
167}
168
169impl Default for ClientConfig {
170    fn default() -> Self {
171        Self::new()
172    }
173}
174
175impl ClientConfig {
176    /// Creates a new empty configuration.
177    pub fn new() -> ClientConfig {
178        ClientConfig {
179            conf_map: HashMap::new(),
180            log_level: log_level_from_global_config(),
181        }
182    }
183
184    /// Gets a reference to the underlying config map
185    pub fn config_map(&self) -> &HashMap<String, String> {
186        &self.conf_map
187    }
188
189    /// Gets the value of a parameter in the configuration.
190    ///
191    /// Returns the current value set for `key`, or `None` if no value for `key`
192    /// exists.
193    ///
194    /// Note that this method will only ever return values that were installed
195    /// by a call to [`ClientConfig::set`]. To retrieve librdkafka's default
196    /// value for a parameter, build a [`NativeClientConfig`] and then call
197    /// [`NativeClientConfig::get`] on the resulting object.
198    pub fn get(&self, key: &str) -> Option<&str> {
199        self.conf_map.get(key).map(|val| val.as_str())
200    }
201
202    /// Sets a parameter in the configuration.
203    ///
204    /// If there is an existing value for `key` in the configuration, it is
205    /// overridden with the new `value`.
206    pub fn set<K, V>(&mut self, key: K, value: V) -> &mut ClientConfig
207    where
208        K: Into<String>,
209        V: Into<String>,
210    {
211        self.conf_map.insert(key.into(), value.into());
212        self
213    }
214
215    /// Removes a parameter from the configuration.
216    pub fn remove<'a>(&'a mut self, key: &str) -> &'a mut ClientConfig {
217        self.conf_map.remove(key);
218        self
219    }
220
221    /// Sets the log level of the client. If not specified, the log level will be calculated based
222    /// on the global log level of the log crate.
223    pub fn set_log_level(&mut self, log_level: RDKafkaLogLevel) -> &mut ClientConfig {
224        self.log_level = log_level;
225        self
226    }
227
228    /// Builds a native librdkafka configuration.
229    pub fn create_native_config(&self) -> KafkaResult<NativeClientConfig> {
230        let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) };
231        let mut err_buf = ErrBuf::new();
232        for (key, value) in &self.conf_map {
233            let key_c = CString::new(key.to_string())?;
234            let value_c = CString::new(value.to_string())?;
235            let ret = unsafe {
236                rdsys::rd_kafka_conf_set(
237                    conf.ptr(),
238                    key_c.as_ptr(),
239                    value_c.as_ptr(),
240                    err_buf.as_mut_ptr(),
241                    err_buf.capacity(),
242                )
243            };
244            if ret.is_error() {
245                return Err(KafkaError::ClientConfig(
246                    ret,
247                    err_buf.to_string(),
248                    key.to_string(),
249                    value.to_string(),
250                ));
251            }
252        }
253        Ok(conf)
254    }
255
256    /// Uses the current configuration to create a new Consumer or Producer.
257    pub async fn create<T: FromClientConfig>(&self) -> KafkaResult<T> {
258        T::from_config(self).await
259    }
260
261    /// Uses the current configuration and the provided context to create a new Consumer or Producer.
262    pub async fn create_with_context<C, T>(&self, context: C) -> KafkaResult<T>
263    where
264        C: ClientContext,
265        T: FromClientConfigAndContext<C>,
266    {
267        T::from_config_and_context(self, context).await
268    }
269}
270
271impl FromIterator<(String, String)> for ClientConfig {
272    fn from_iter<I>(iter: I) -> ClientConfig
273    where
274        I: IntoIterator<Item = (String, String)>,
275    {
276        let mut config = ClientConfig::new();
277        config.extend(iter);
278        config
279    }
280}
281
282impl Extend<(String, String)> for ClientConfig {
283    fn extend<I>(&mut self, iter: I)
284    where
285        I: IntoIterator<Item = (String, String)>,
286    {
287        self.conf_map.extend(iter)
288    }
289}
290
291/// Return the log level
292fn log_level_from_global_config() -> RDKafkaLogLevel {
293    if log_enabled!(target: "librdkafka", DEBUG) {
294        RDKafkaLogLevel::Debug
295    } else if log_enabled!(target: "librdkafka", INFO) {
296        RDKafkaLogLevel::Info
297    } else if log_enabled!(target: "librdkafka", WARN) {
298        RDKafkaLogLevel::Warning
299    } else {
300        RDKafkaLogLevel::Error
301    }
302}
303
304/// Create a new client based on the provided configuration.
305#[async_trait::async_trait]
306pub trait FromClientConfig: Sized {
307    /// Creates a client from a client configuration. The default client context
308    /// will be used.
309    async fn from_config(_: &ClientConfig) -> KafkaResult<Self>;
310}
311
312/// Create a new client based on the provided configuration and context.
313#[async_trait::async_trait]
314pub trait FromClientConfigAndContext<C: ClientContext>: Sized {
315    /// Creates a client from a client configuration and a client context.
316    async fn from_config_and_context(_: &ClientConfig, _: C) -> KafkaResult<Self>;
317}
318
319#[cfg(test)]
320mod tests {
321    use super::ClientConfig;
322
323    #[test]
324    fn test_client_config_set_map() {
325        let mut config: ClientConfig = vec![("a".into(), "1".into()), ("b".into(), "1".into())]
326            .into_iter()
327            .collect();
328        config.extend([("b".into(), "2".into()), ("c".into(), "3".into())]);
329
330        assert_eq!(config.get("a").unwrap(), "1");
331        assert_eq!(config.get("b").unwrap(), "2");
332        assert_eq!(config.get("c").unwrap(), "3");
333    }
334}