madsim_rdkafka/std/
admin.rs

1//! Admin client.
2//!
3//! The main object is the [`AdminClient`] struct.
4//!
5//! [`AdminClient`]: struct.AdminClient.html
6
7use std::collections::HashMap;
8use std::ffi::{c_void, CStr, CString};
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::thread::{self, JoinHandle};
15use std::time::Duration;
16
17use futures_channel::oneshot;
18use futures_util::future::{self, Either, FutureExt};
19use futures_util::ready;
20
21use rdkafka_sys as rdsys;
22use rdkafka_sys::types::*;
23
24use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
25use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
26use crate::error::{IsError, KafkaError, KafkaResult};
27use crate::log::{trace, warn};
28use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
29
30//
31// ********** ADMIN CLIENT **********
32//
33
34/// A client for the Kafka admin API.
35///
36/// `AdminClient` provides programmatic access to managing a Kafka cluster,
37/// notably manipulating topics, partitions, and configuration parameters.
38pub struct AdminClient<C: ClientContext> {
39    client: Client<C>,
40    queue: Arc<NativeQueue>,
41    should_stop: Arc<AtomicBool>,
42    handle: Option<JoinHandle<()>>,
43}
44
45impl<C: ClientContext> AdminClient<C> {
46    /// Creates new topics according to the provided `NewTopic` specifications.
47    ///
48    /// Note that while the API supports creating multiple topics at once, it
49    /// is not transactional. Creation of some topics may succeed while others
50    /// fail. Be sure to check the result of each individual operation.
51    pub fn create_topics<'a, I>(
52        &self,
53        topics: I,
54        opts: &AdminOptions,
55    ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
56    where
57        I: IntoIterator<Item = &'a NewTopic<'a>>,
58    {
59        match self.create_topics_inner(topics, opts) {
60            Ok(rx) => Either::Left(CreateTopicsFuture { rx }),
61            Err(err) => Either::Right(future::err(err)),
62        }
63    }
64
65    fn create_topics_inner<'a, I>(
66        &self,
67        topics: I,
68        opts: &AdminOptions,
69    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
70    where
71        I: IntoIterator<Item = &'a NewTopic<'a>>,
72    {
73        let mut native_topics = Vec::new();
74        let mut err_buf = ErrBuf::new();
75        for t in topics {
76            native_topics.push(t.to_native(&mut err_buf)?);
77        }
78        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
79        unsafe {
80            rdsys::rd_kafka_CreateTopics(
81                self.client.native_ptr(),
82                native_topics.as_c_array(),
83                native_topics.len(),
84                native_opts.ptr(),
85                self.queue.ptr(),
86            );
87        }
88        Ok(rx)
89    }
90
91    /// Deletes the named topics.
92    ///
93    /// Note that while the API supports deleting multiple topics at once, it is
94    /// not transactional. Deletion of some topics may succeed while others
95    /// fail. Be sure to check the result of each individual operation.
96    pub fn delete_topics(
97        &self,
98        topic_names: &[&str],
99        opts: &AdminOptions,
100    ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>> {
101        match self.delete_topics_inner(topic_names, opts) {
102            Ok(rx) => Either::Left(DeleteTopicsFuture { rx }),
103            Err(err) => Either::Right(future::err(err)),
104        }
105    }
106
107    fn delete_topics_inner(
108        &self,
109        topic_names: &[&str],
110        opts: &AdminOptions,
111    ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
112        let mut native_topics = Vec::new();
113        let mut err_buf = ErrBuf::new();
114        for tn in topic_names {
115            let tn_c = CString::new(*tn)?;
116            let native_topic = unsafe {
117                NativeDeleteTopic::from_ptr(rdsys::rd_kafka_DeleteTopic_new(tn_c.as_ptr())).unwrap()
118            };
119            native_topics.push(native_topic);
120        }
121        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
122        unsafe {
123            rdsys::rd_kafka_DeleteTopics(
124                self.client.native_ptr(),
125                native_topics.as_c_array(),
126                native_topics.len(),
127                native_opts.ptr(),
128                self.queue.ptr(),
129            );
130        }
131        Ok(rx)
132    }
133
134    /// Deletes the named groups.
135    pub fn delete_groups(
136        &self,
137        group_names: &[&str],
138        opts: &AdminOptions,
139    ) -> impl Future<Output = KafkaResult<Vec<GroupResult>>> {
140        match self.delete_groups_inner(group_names, opts) {
141            Ok(rx) => Either::Left(DeleteGroupsFuture { rx }),
142            Err(err) => Either::Right(future::err(err)),
143        }
144    }
145
146    fn delete_groups_inner(
147        &self,
148        group_names: &[&str],
149        opts: &AdminOptions,
150    ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
151        let mut native_groups = Vec::new();
152        let mut err_buf = ErrBuf::new();
153        for gn in group_names {
154            let gn_t = CString::new(*gn)?;
155            let native_group = unsafe {
156                NativeDeleteGroup::from_ptr(rdsys::rd_kafka_DeleteGroup_new(gn_t.as_ptr())).unwrap()
157            };
158            native_groups.push(native_group);
159        }
160        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
161
162        unsafe {
163            rdsys::rd_kafka_DeleteGroups(
164                self.client.native_ptr(),
165                native_groups.as_c_array(),
166                native_groups.len(),
167                native_opts.ptr(),
168                self.queue.ptr(),
169            )
170        }
171        Ok(rx)
172    }
173
174    /// Adds additional partitions to existing topics according to the provided
175    /// `NewPartitions` specifications.
176    ///
177    /// Note that while the API supports creating partitions for multiple topics
178    /// at once, it is not transactional. Creation of partitions for some topics
179    /// may succeed while others fail. Be sure to check the result of each
180    /// individual operation.
181    pub fn create_partitions<'a, I>(
182        &self,
183        partitions: I,
184        opts: &AdminOptions,
185    ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
186    where
187        I: IntoIterator<Item = &'a NewPartitions<'a>>,
188    {
189        match self.create_partitions_inner(partitions, opts) {
190            Ok(rx) => Either::Left(CreatePartitionsFuture { rx }),
191            Err(err) => Either::Right(future::err(err)),
192        }
193    }
194
195    fn create_partitions_inner<'a, I>(
196        &self,
197        partitions: I,
198        opts: &AdminOptions,
199    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
200    where
201        I: IntoIterator<Item = &'a NewPartitions<'a>>,
202    {
203        let mut native_partitions = Vec::new();
204        let mut err_buf = ErrBuf::new();
205        for p in partitions {
206            native_partitions.push(p.to_native(&mut err_buf)?);
207        }
208        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
209        unsafe {
210            rdsys::rd_kafka_CreatePartitions(
211                self.client.native_ptr(),
212                native_partitions.as_c_array(),
213                native_partitions.len(),
214                native_opts.ptr(),
215                self.queue.ptr(),
216            );
217        }
218        Ok(rx)
219    }
220
221    /// Retrieves the configuration parameters for the specified resources.
222    ///
223    /// Note that while the API supports describing multiple configurations at
224    /// once, it is not transactional. There is no guarantee that you will see
225    /// a consistent snapshot of the configuration across different resources.
226    pub fn describe_configs<'a, I>(
227        &self,
228        configs: I,
229        opts: &AdminOptions,
230    ) -> impl Future<Output = KafkaResult<Vec<ConfigResourceResult>>>
231    where
232        I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
233    {
234        match self.describe_configs_inner(configs, opts) {
235            Ok(rx) => Either::Left(DescribeConfigsFuture { rx }),
236            Err(err) => Either::Right(future::err(err)),
237        }
238    }
239
240    fn describe_configs_inner<'a, I>(
241        &self,
242        configs: I,
243        opts: &AdminOptions,
244    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
245    where
246        I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
247    {
248        let mut native_configs = Vec::new();
249        let mut err_buf = ErrBuf::new();
250        for c in configs {
251            let (name, typ) = match c {
252                ResourceSpecifier::Topic(name) => (
253                    CString::new(*name)?,
254                    RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
255                ),
256                ResourceSpecifier::Group(name) => (
257                    CString::new(*name)?,
258                    RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
259                ),
260                ResourceSpecifier::Broker(id) => (
261                    CString::new(format!("{}", id))?,
262                    RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
263                ),
264            };
265            native_configs.push(unsafe {
266                NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(
267                    typ,
268                    name.as_ptr(),
269                ))
270                .unwrap()
271            });
272        }
273        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
274        unsafe {
275            rdsys::rd_kafka_DescribeConfigs(
276                self.client.native_ptr(),
277                native_configs.as_c_array(),
278                native_configs.len(),
279                native_opts.ptr(),
280                self.queue.ptr(),
281            );
282        }
283        Ok(rx)
284    }
285
286    /// Sets configuration parameters for the specified resources.
287    ///
288    /// Note that while the API supports altering multiple resources at once, it
289    /// is not transactional. Alteration of some resources may succeed while
290    /// others fail. Be sure to check the result of each individual operation.
291    pub fn alter_configs<'a, I>(
292        &self,
293        configs: I,
294        opts: &AdminOptions,
295    ) -> impl Future<Output = KafkaResult<Vec<AlterConfigsResult>>>
296    where
297        I: IntoIterator<Item = &'a AlterConfig<'a>>,
298    {
299        match self.alter_configs_inner(configs, opts) {
300            Ok(rx) => Either::Left(AlterConfigsFuture { rx }),
301            Err(err) => Either::Right(future::err(err)),
302        }
303    }
304
305    fn alter_configs_inner<'a, I>(
306        &self,
307        configs: I,
308        opts: &AdminOptions,
309    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
310    where
311        I: IntoIterator<Item = &'a AlterConfig<'a>>,
312    {
313        let mut native_configs = Vec::new();
314        let mut err_buf = ErrBuf::new();
315        for c in configs {
316            native_configs.push(c.to_native(&mut err_buf)?);
317        }
318        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
319        unsafe {
320            rdsys::rd_kafka_AlterConfigs(
321                self.client.native_ptr(),
322                native_configs.as_c_array(),
323                native_configs.len(),
324                native_opts.ptr(),
325                self.queue.ptr(),
326            );
327        }
328        Ok(rx)
329    }
330
331    /// Returns the client underlying this admin client.
332    pub fn inner(&self) -> &Client<C> {
333        &self.client
334    }
335}
336
337#[async_trait::async_trait]
338impl FromClientConfig for AdminClient<DefaultClientContext> {
339    async fn from_config(config: &ClientConfig) -> KafkaResult<AdminClient<DefaultClientContext>> {
340        AdminClient::from_config_and_context(config, DefaultClientContext).await
341    }
342}
343
344#[async_trait::async_trait]
345impl<C: ClientContext> FromClientConfigAndContext<C> for AdminClient<C> {
346    async fn from_config_and_context(
347        config: &ClientConfig,
348        context: C,
349    ) -> KafkaResult<AdminClient<C>> {
350        let native_config = config.create_native_config()?;
351        // librdkafka only provides consumer and producer types. We follow the
352        // example of the Python bindings in choosing to pretend to be a
353        // producer, as producer clients are allegedly more lightweight. [0]
354        //
355        // [0]: https://github.com/confluentinc/confluent-kafka-python/blob/bfb07dfbca47c256c840aaace83d3fe26c587360/confluent_kafka/src/Admin.c#L1492-L1493
356        let client = Client::new(
357            config,
358            native_config,
359            RDKafkaType::RD_KAFKA_PRODUCER,
360            context,
361        )?;
362        let queue = Arc::new(client.new_native_queue());
363        let should_stop = Arc::new(AtomicBool::new(false));
364        let handle = start_poll_thread(queue.clone(), should_stop.clone());
365        Ok(AdminClient {
366            client,
367            queue,
368            should_stop,
369            handle: Some(handle),
370        })
371    }
372}
373
374impl<C: ClientContext> Drop for AdminClient<C> {
375    fn drop(&mut self) {
376        trace!("Stopping polling");
377        self.should_stop.store(true, Ordering::Relaxed);
378        trace!("Waiting for polling thread termination");
379        match self.handle.take().unwrap().join() {
380            Ok(()) => trace!("Polling stopped"),
381            Err(e) => warn!("Failure while terminating thread: {:?}", e),
382        };
383    }
384}
385
386fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> JoinHandle<()> {
387    thread::Builder::new()
388        .name("admin client polling thread".into())
389        .spawn(move || {
390            trace!("Admin polling thread loop started");
391            loop {
392                let event = queue.poll(Duration::from_millis(100));
393                if event.is_null() {
394                    if should_stop.load(Ordering::Relaxed) {
395                        // We received nothing and the thread should stop, so
396                        // break the loop.
397                        break;
398                    }
399                    continue;
400                }
401                let event = unsafe { NativeEvent::from_ptr(event).unwrap() };
402                let tx: Box<oneshot::Sender<NativeEvent>> =
403                    unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) };
404                let _ = tx.send(event);
405            }
406            trace!("Admin polling thread loop terminated");
407        })
408        .expect("Failed to start polling thread")
409}
410
411type NativeEvent = NativePtr<RDKafkaEvent>;
412
413unsafe impl KafkaDrop for RDKafkaEvent {
414    const TYPE: &'static str = "event";
415    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_event_destroy;
416}
417
418unsafe impl Send for NativeEvent {}
419unsafe impl Sync for NativeEvent {}
420
421impl NativePtr<RDKafkaEvent> {
422    fn check_error(&self) -> KafkaResult<()> {
423        let err = unsafe { rdsys::rd_kafka_event_error(self.ptr()) };
424        if err.is_error() {
425            Err(KafkaError::AdminOp(err.into()))
426        } else {
427            Ok(())
428        }
429    }
430}
431
432//
433// ********** ADMIN OPTIONS **********
434//
435
436/// Options for an admin API request.
437#[derive(Default)]
438pub struct AdminOptions {
439    request_timeout: Option<Timeout>,
440    operation_timeout: Option<Timeout>,
441    validate_only: bool,
442    broker_id: Option<i32>,
443}
444
445impl AdminOptions {
446    /// Creates a new `AdminOptions`.
447    pub fn new() -> AdminOptions {
448        AdminOptions::default()
449    }
450
451    /// Sets the overall request timeout, including broker lookup, request
452    /// transmission, operation time on broker, and response.
453    ///
454    /// Defaults to the `socket.timeout.ms` configuration parameter.
455    pub fn request_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
456        self.request_timeout = timeout.map(Into::into);
457        self
458    }
459
460    /// Sets the broker's operation timeout, such as the timeout for
461    /// CreateTopics to complete the creation of topics on the controller before
462    /// returning a result to the application.
463    ///
464    /// If unset (the default), the API calls will return immediately after
465    /// triggering the operation.
466    ///
467    /// Only the CreateTopics, DeleteTopics, and CreatePartitions API calls
468    /// respect this option.
469    pub fn operation_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
470        self.operation_timeout = timeout.map(Into::into);
471        self
472    }
473
474    /// Tells the broker to only validate the request, without performing the
475    /// requested operation.
476    ///
477    /// Defaults to false.
478    pub fn validate_only(mut self, validate_only: bool) -> Self {
479        self.validate_only = validate_only;
480        self
481    }
482
483    /// Override what broker the admin request will be sent to.
484    ///
485    /// By default, a reasonable broker will be selected automatically. See the
486    /// librdkafka docs on `rd_kafka_AdminOptions_set_broker` for details.
487    pub fn broker_id<T: Into<Option<i32>>>(mut self, broker_id: T) -> Self {
488        self.broker_id = broker_id.into();
489        self
490    }
491
492    fn to_native(
493        &self,
494        client: *mut RDKafka,
495        err_buf: &mut ErrBuf,
496    ) -> KafkaResult<(NativeAdminOptions, oneshot::Receiver<NativeEvent>)> {
497        let native_opts = unsafe {
498            NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new(
499                client,
500                RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY,
501            ))
502            .unwrap()
503        };
504
505        if let Some(timeout) = self.request_timeout {
506            let res = unsafe {
507                rdsys::rd_kafka_AdminOptions_set_request_timeout(
508                    native_opts.ptr(),
509                    timeout.as_millis(),
510                    err_buf.as_mut_ptr(),
511                    err_buf.capacity(),
512                )
513            };
514            check_rdkafka_invalid_arg(res, err_buf)?;
515        }
516
517        if let Some(timeout) = self.operation_timeout {
518            let res = unsafe {
519                rdsys::rd_kafka_AdminOptions_set_operation_timeout(
520                    native_opts.ptr(),
521                    timeout.as_millis(),
522                    err_buf.as_mut_ptr(),
523                    err_buf.capacity(),
524                )
525            };
526            check_rdkafka_invalid_arg(res, err_buf)?;
527        }
528
529        if self.validate_only {
530            let res = unsafe {
531                rdsys::rd_kafka_AdminOptions_set_validate_only(
532                    native_opts.ptr(),
533                    1, // true
534                    err_buf.as_mut_ptr(),
535                    err_buf.capacity(),
536                )
537            };
538            check_rdkafka_invalid_arg(res, err_buf)?;
539        }
540
541        if let Some(broker_id) = self.broker_id {
542            let res = unsafe {
543                rdsys::rd_kafka_AdminOptions_set_broker(
544                    native_opts.ptr(),
545                    broker_id,
546                    err_buf.as_mut_ptr(),
547                    err_buf.capacity(),
548                )
549            };
550            check_rdkafka_invalid_arg(res, err_buf)?;
551        }
552
553        let (tx, rx) = oneshot::channel();
554        let tx = Box::into_raw(Box::new(tx)) as *mut c_void;
555        unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr(), tx) };
556
557        Ok((native_opts, rx))
558    }
559}
560
561unsafe impl KafkaDrop for RDKafkaAdminOptions {
562    const TYPE: &'static str = "admin options";
563    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_AdminOptions_destroy;
564}
565
566type NativeAdminOptions = NativePtr<RDKafkaAdminOptions>;
567
568fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResult<()> {
569    match res.into() {
570        RDKafkaErrorCode::NoError => Ok(()),
571        RDKafkaErrorCode::InvalidArgument => {
572            let msg = if err_buf.len() == 0 {
573                "invalid argument".into()
574            } else {
575                err_buf.to_string()
576            };
577            Err(KafkaError::AdminOpCreation(msg))
578        }
579        res => Err(KafkaError::AdminOpCreation(format!(
580            "setting admin options returned unexpected error code {}",
581            res
582        ))),
583    }
584}
585
586//
587// ********** RESPONSE HANDLING **********
588//
589
590/// The result of an individual CreateTopic, DeleteTopic, or
591/// CreatePartition operation.
592pub type TopicResult = Result<String, (String, RDKafkaErrorCode)>;
593
594fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Vec<TopicResult> {
595    let mut out = Vec::with_capacity(n);
596    for i in 0..n {
597        let topic = unsafe { *topics.add(i) };
598        let name = unsafe { cstr_to_owned(rdsys::rd_kafka_topic_result_name(topic)) };
599        let err = unsafe { rdsys::rd_kafka_topic_result_error(topic) };
600        if err.is_error() {
601            out.push(Err((name, err.into())));
602        } else {
603            out.push(Ok(name));
604        }
605    }
606    out
607}
608
609/// The result of a DeleteGroup operation.
610pub type GroupResult = Result<String, (String, RDKafkaErrorCode)>;
611
612fn build_group_results(groups: *const *const RDKafkaGroupResult, n: usize) -> Vec<GroupResult> {
613    let mut out = Vec::with_capacity(n);
614    for i in 0..n {
615        let group = unsafe { *groups.add(i) };
616        let name = unsafe { cstr_to_owned(rdsys::rd_kafka_group_result_name(group)) };
617        let err = unsafe {
618            let err = rdsys::rd_kafka_group_result_error(group);
619            rdsys::rd_kafka_error_code(err)
620        };
621        if err.is_error() {
622            out.push(Err((name, err.into())));
623        } else {
624            out.push(Ok(name));
625        }
626    }
627    out
628}
629
630//
631// Create topic handling
632//
633
634/// Configuration for a CreateTopic operation.
635#[derive(Debug)]
636pub struct NewTopic<'a> {
637    /// The name of the new topic.
638    pub name: &'a str,
639    /// The initial number of partitions.
640    pub num_partitions: i32,
641    /// The initial replication configuration.
642    pub replication: TopicReplication<'a>,
643    /// The initial configuration parameters for the topic.
644    pub config: Vec<(&'a str, &'a str)>,
645}
646
647impl<'a> NewTopic<'a> {
648    /// Creates a new `NewTopic`.
649    pub fn new(
650        name: &'a str,
651        num_partitions: i32,
652        replication: TopicReplication<'a>,
653    ) -> NewTopic<'a> {
654        NewTopic {
655            name,
656            num_partitions,
657            replication,
658            config: Vec::new(),
659        }
660    }
661
662    /// Sets a new parameter in the initial topic configuration.
663    pub fn set(mut self, key: &'a str, value: &'a str) -> NewTopic<'a> {
664        self.config.push((key, value));
665        self
666    }
667
668    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewTopic> {
669        let name = CString::new(self.name)?;
670        let repl = match self.replication {
671            TopicReplication::Fixed(n) => n,
672            TopicReplication::Variable(partitions) => {
673                if partitions.len() as i32 != self.num_partitions {
674                    return Err(KafkaError::AdminOpCreation(format!(
675                        "replication configuration for topic '{}' assigns {} partition(s), \
676                         which does not match the specified number of partitions ({})",
677                        self.name,
678                        partitions.len(),
679                        self.num_partitions,
680                    )));
681                }
682                -1
683            }
684        };
685        // N.B.: we wrap topic immediately, so that it is destroyed via the
686        // NativeNewTopic's Drop implementation if replica assignment or config
687        // installation fails.
688        let topic = unsafe {
689            NativeNewTopic::from_ptr(rdsys::rd_kafka_NewTopic_new(
690                name.as_ptr(),
691                self.num_partitions,
692                repl,
693                err_buf.as_mut_ptr(),
694                err_buf.capacity(),
695            ))
696        }
697        .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
698
699        if let TopicReplication::Variable(assignment) = self.replication {
700            for (partition_id, broker_ids) in assignment.iter().enumerate() {
701                let res = unsafe {
702                    rdsys::rd_kafka_NewTopic_set_replica_assignment(
703                        topic.ptr(),
704                        partition_id as i32,
705                        broker_ids.as_ptr() as *mut i32,
706                        broker_ids.len(),
707                        err_buf.as_mut_ptr(),
708                        err_buf.capacity(),
709                    )
710                };
711                check_rdkafka_invalid_arg(res, err_buf)?;
712            }
713        }
714        for (key, val) in &self.config {
715            let key_c = CString::new(*key)?;
716            let val_c = CString::new(*val)?;
717            let res = unsafe {
718                rdsys::rd_kafka_NewTopic_set_config(topic.ptr(), key_c.as_ptr(), val_c.as_ptr())
719            };
720            check_rdkafka_invalid_arg(res, err_buf)?;
721        }
722        Ok(topic)
723    }
724}
725
726/// An assignment of partitions to replicas.
727///
728/// Each element in the outer slice corresponds to the partition with that
729/// index. The inner slice specifies the broker IDs to which replicas of that
730/// partition should be assigned.
731pub type PartitionAssignment<'a> = &'a [&'a [i32]];
732
733/// Replication configuration for a new topic.
734#[derive(Debug)]
735pub enum TopicReplication<'a> {
736    /// All partitions should use the same fixed replication factor.
737    Fixed(i32),
738    /// Each partition should use the replica assignment from
739    /// `PartitionAssignment`.
740    Variable(PartitionAssignment<'a>),
741}
742
743type NativeNewTopic = NativePtr<RDKafkaNewTopic>;
744
745unsafe impl KafkaDrop for RDKafkaNewTopic {
746    const TYPE: &'static str = "new topic";
747    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewTopic_destroy;
748}
749
750struct CreateTopicsFuture {
751    rx: oneshot::Receiver<NativeEvent>,
752}
753
754impl Future for CreateTopicsFuture {
755    type Output = KafkaResult<Vec<TopicResult>>;
756
757    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
758        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
759        event.check_error()?;
760        let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) };
761        if res.is_null() {
762            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
763            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
764                "create topics request received response of incorrect type ({})",
765                typ
766            ))));
767        }
768        let mut n = 0;
769        let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) };
770        Poll::Ready(Ok(build_topic_results(topics, n)))
771    }
772}
773
774//
775// Delete topic handling
776//
777
778type NativeDeleteTopic = NativePtr<RDKafkaDeleteTopic>;
779
780unsafe impl KafkaDrop for RDKafkaDeleteTopic {
781    const TYPE: &'static str = "delete topic";
782    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteTopic_destroy;
783}
784
785struct DeleteTopicsFuture {
786    rx: oneshot::Receiver<NativeEvent>,
787}
788
789impl Future for DeleteTopicsFuture {
790    type Output = KafkaResult<Vec<TopicResult>>;
791
792    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
793        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
794        event.check_error()?;
795        let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) };
796        if res.is_null() {
797            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
798            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
799                "delete topics request received response of incorrect type ({})",
800                typ
801            ))));
802        }
803        let mut n = 0;
804        let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) };
805        Poll::Ready(Ok(build_topic_results(topics, n)))
806    }
807}
808
809//
810// Delete group handling
811//
812
813type NativeDeleteGroup = NativePtr<RDKafkaDeleteGroup>;
814
815unsafe impl KafkaDrop for RDKafkaDeleteGroup {
816    const TYPE: &'static str = "delete group";
817    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteGroup_destroy;
818}
819
820struct DeleteGroupsFuture {
821    rx: oneshot::Receiver<NativeEvent>,
822}
823
824impl Future for DeleteGroupsFuture {
825    type Output = KafkaResult<Vec<GroupResult>>;
826
827    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
828        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
829        event.check_error()?;
830        let res = unsafe { rdsys::rd_kafka_event_DeleteGroups_result(event.ptr()) };
831        if res.is_null() {
832            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
833            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
834                "delete groups request received response of incorrect type ({})",
835                typ
836            ))));
837        }
838        let mut n = 0;
839        let groups = unsafe { rdsys::rd_kafka_DeleteGroups_result_groups(res, &mut n) };
840        Poll::Ready(Ok(build_group_results(groups, n)))
841    }
842}
843
844//
845// Create partitions handling
846//
847
848/// Configuration for a CreatePartitions operation.
849pub struct NewPartitions<'a> {
850    /// The name of the topic to which partitions should be added.
851    pub topic_name: &'a str,
852    /// The total number of partitions after the operation completes.
853    pub new_partition_count: usize,
854    /// The replica assignments for the new partitions.
855    pub assignment: Option<PartitionAssignment<'a>>,
856}
857
858impl<'a> NewPartitions<'a> {
859    /// Creates a new `NewPartitions`.
860    pub fn new(topic_name: &'a str, new_partition_count: usize) -> NewPartitions<'a> {
861        NewPartitions {
862            topic_name,
863            new_partition_count,
864            assignment: None,
865        }
866    }
867
868    /// Sets the partition replica assignment for the new partitions. Only
869    /// assignments for newly created replicas should be included.
870    pub fn assign(mut self, assignment: PartitionAssignment<'a>) -> NewPartitions<'a> {
871        self.assignment = Some(assignment);
872        self
873    }
874
875    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewPartitions> {
876        let name = CString::new(self.topic_name)?;
877        if let Some(assignment) = self.assignment {
878            // If assignment contains more than self.new_partition_count
879            // entries, we'll trip an assertion in librdkafka that crashes the
880            // process. Note that this check isn't a guarantee that the
881            // partition assignment is valid, since the assignment should only
882            // contain entries for the *new* partitions added, and not any
883            // existing partitions, but we can let the server handle that
884            // validation--we just need to make sure not to crash librdkafka.
885            if assignment.len() > self.new_partition_count {
886                return Err(KafkaError::AdminOpCreation(format!(
887                    "partition assignment for topic '{}' assigns {} partition(s), \
888                     which is more than the requested total number of partitions ({})",
889                    self.topic_name,
890                    assignment.len(),
891                    self.new_partition_count,
892                )));
893            }
894        }
895        // N.B.: we wrap partition immediately, so that it is destroyed via
896        // NativeNewPartitions's Drop implementation if replica assignment or
897        // config installation fails.
898        let partitions = unsafe {
899            NativeNewPartitions::from_ptr(rdsys::rd_kafka_NewPartitions_new(
900                name.as_ptr(),
901                self.new_partition_count,
902                err_buf.as_mut_ptr(),
903                err_buf.capacity(),
904            ))
905        }
906        .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
907
908        if let Some(assignment) = self.assignment {
909            for (partition_id, broker_ids) in assignment.iter().enumerate() {
910                let res = unsafe {
911                    rdsys::rd_kafka_NewPartitions_set_replica_assignment(
912                        partitions.ptr(),
913                        partition_id as i32,
914                        broker_ids.as_ptr() as *mut i32,
915                        broker_ids.len(),
916                        err_buf.as_mut_ptr(),
917                        err_buf.capacity(),
918                    )
919                };
920                check_rdkafka_invalid_arg(res, err_buf)?;
921            }
922        }
923        Ok(partitions)
924    }
925}
926
927type NativeNewPartitions = NativePtr<RDKafkaNewPartitions>;
928
929unsafe impl KafkaDrop for RDKafkaNewPartitions {
930    const TYPE: &'static str = "new partitions";
931    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewPartitions_destroy;
932}
933
934struct CreatePartitionsFuture {
935    rx: oneshot::Receiver<NativeEvent>,
936}
937
938impl Future for CreatePartitionsFuture {
939    type Output = KafkaResult<Vec<TopicResult>>;
940
941    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
942        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
943        event.check_error()?;
944        let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) };
945        if res.is_null() {
946            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
947            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
948                "create partitions request received response of incorrect type ({})",
949                typ
950            ))));
951        }
952        let mut n = 0;
953        let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) };
954        Poll::Ready(Ok(build_topic_results(topics, n)))
955    }
956}
957
958//
959// Describe configs handling
960//
961
962/// The result of an individual DescribeConfig operation.
963pub type ConfigResourceResult = Result<ConfigResource, RDKafkaErrorCode>;
964
965/// Specification of a configurable resource.
966#[derive(Copy, Clone, Debug, Eq, PartialEq)]
967pub enum ResourceSpecifier<'a> {
968    /// A topic resource, identified by its name.
969    Topic(&'a str),
970    /// A group resource, identified by its ID.
971    Group(&'a str),
972    /// A broker resource, identified by its ID.
973    Broker(i32),
974}
975
976/// A `ResourceSpecifier` that owns its data.
977#[derive(Debug, Eq, PartialEq)]
978pub enum OwnedResourceSpecifier {
979    /// A topic resource, identified by its name.
980    Topic(String),
981    /// A group resource, identified by its ID.
982    Group(String),
983    /// A broker resource, identified by its ID.
984    Broker(i32),
985}
986
987/// The source of a configuration entry.
988#[derive(Debug, Eq, PartialEq)]
989pub enum ConfigSource {
990    /// Unknown. Note that Kafka brokers before v1.1.0 do not reliably provide
991    /// configuration source information.
992    Unknown,
993    /// A dynamic topic configuration.
994    DynamicTopic,
995    /// A dynamic broker configuration.
996    DynamicBroker,
997    /// The default dynamic broker configuration.
998    DynamicDefaultBroker,
999    /// The static broker configuration.
1000    StaticBroker,
1001    /// The hardcoded default configuration.
1002    Default,
1003}
1004
1005/// An individual configuration parameter for a `ConfigResource`.
1006#[derive(Debug, Eq, PartialEq)]
1007pub struct ConfigEntry {
1008    /// The name of the configuration parameter.
1009    pub name: String,
1010    /// The value of the configuration parameter.
1011    pub value: Option<String>,
1012    /// The source of the configuration parameter.
1013    pub source: ConfigSource,
1014    /// Whether the configuration parameter is read only.
1015    pub is_read_only: bool,
1016    /// Whether the configuration parameter currently has the default value.
1017    pub is_default: bool,
1018    /// Whether the configuration parameter contains sensitive data.
1019    pub is_sensitive: bool,
1020}
1021
1022/// A configurable resource and its current configuration values.
1023#[derive(Debug)]
1024pub struct ConfigResource {
1025    /// Identifies the resource.
1026    pub specifier: OwnedResourceSpecifier,
1027    /// The current configuration parameters.
1028    pub entries: Vec<ConfigEntry>,
1029}
1030
1031impl ConfigResource {
1032    /// Builds a `HashMap` of configuration entries, keyed by configuration
1033    /// entry name.
1034    pub fn entry_map(&self) -> HashMap<&str, &ConfigEntry> {
1035        self.entries.iter().map(|e| (&*e.name, e)).collect()
1036    }
1037
1038    /// Searches the configuration entries to find the named parameter.
1039    ///
1040    /// For more efficient lookups, use `entry_map` to build a `HashMap`
1041    /// instead.
1042    pub fn get(&self, name: &str) -> Option<&ConfigEntry> {
1043        self.entries.iter().find(|e| e.name == name)
1044    }
1045}
1046
1047type NativeConfigResource = NativePtr<RDKafkaConfigResource>;
1048
1049unsafe impl KafkaDrop for RDKafkaConfigResource {
1050    const TYPE: &'static str = "config resource";
1051    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_ConfigResource_destroy;
1052}
1053
1054fn extract_config_specifier(
1055    resource: *const RDKafkaConfigResource,
1056) -> KafkaResult<OwnedResourceSpecifier> {
1057    let typ = unsafe { rdsys::rd_kafka_ConfigResource_type(resource) };
1058    match typ {
1059        RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC => {
1060            let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1061            Ok(OwnedResourceSpecifier::Topic(name))
1062        }
1063        RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP => {
1064            let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1065            Ok(OwnedResourceSpecifier::Group(name))
1066        }
1067        RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER => {
1068            let name = unsafe { CStr::from_ptr(rdsys::rd_kafka_ConfigResource_name(resource)) }
1069                .to_string_lossy();
1070            match name.parse::<i32>() {
1071                Ok(id) => Ok(OwnedResourceSpecifier::Broker(id)),
1072                Err(_) => Err(KafkaError::AdminOpCreation(format!(
1073                    "bogus broker ID in kafka response: {}",
1074                    name
1075                ))),
1076            }
1077        }
1078        _ => Err(KafkaError::AdminOpCreation(format!(
1079            "bogus resource type in kafka response: {:?}",
1080            typ
1081        ))),
1082    }
1083}
1084
1085fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult<ConfigSource> {
1086    match config_source {
1087        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG => Ok(ConfigSource::Unknown),
1088        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG => {
1089            Ok(ConfigSource::DynamicTopic)
1090        }
1091        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG => {
1092            Ok(ConfigSource::DynamicBroker)
1093        }
1094        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG => {
1095            Ok(ConfigSource::DynamicDefaultBroker)
1096        }
1097        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG => {
1098            Ok(ConfigSource::StaticBroker)
1099        }
1100        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG => Ok(ConfigSource::Default),
1101        _ => Err(KafkaError::AdminOpCreation(format!(
1102            "bogus config source type in kafka response: {:?}",
1103            config_source,
1104        ))),
1105    }
1106}
1107
1108struct DescribeConfigsFuture {
1109    rx: oneshot::Receiver<NativeEvent>,
1110}
1111
1112impl Future for DescribeConfigsFuture {
1113    type Output = KafkaResult<Vec<ConfigResourceResult>>;
1114
1115    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1116        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1117        event.check_error()?;
1118        let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) };
1119        if res.is_null() {
1120            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1121            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1122                "describe configs request received response of incorrect type ({})",
1123                typ
1124            ))));
1125        }
1126        let mut n = 0;
1127        let resources = unsafe { rdsys::rd_kafka_DescribeConfigs_result_resources(res, &mut n) };
1128        let mut out = Vec::with_capacity(n);
1129        for i in 0..n {
1130            let resource = unsafe { *resources.add(i) };
1131            let specifier = extract_config_specifier(resource)?;
1132            let mut entries_out = Vec::new();
1133            let mut n = 0;
1134            let entries = unsafe { rdsys::rd_kafka_ConfigResource_configs(resource, &mut n) };
1135            for j in 0..n {
1136                let entry = unsafe { *entries.add(j) };
1137                let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigEntry_name(entry)) };
1138                let value = unsafe {
1139                    let value = rdsys::rd_kafka_ConfigEntry_value(entry);
1140                    if value.is_null() {
1141                        None
1142                    } else {
1143                        Some(cstr_to_owned(value))
1144                    }
1145                };
1146                entries_out.push(ConfigEntry {
1147                    name,
1148                    value,
1149                    source: extract_config_source(unsafe {
1150                        rdsys::rd_kafka_ConfigEntry_source(entry)
1151                    })?,
1152                    is_read_only: unsafe { rdsys::rd_kafka_ConfigEntry_is_read_only(entry) } != 0,
1153                    is_default: unsafe { rdsys::rd_kafka_ConfigEntry_is_default(entry) } != 0,
1154                    is_sensitive: unsafe { rdsys::rd_kafka_ConfigEntry_is_sensitive(entry) } != 0,
1155                });
1156            }
1157            out.push(Ok(ConfigResource {
1158                specifier,
1159                entries: entries_out,
1160            }))
1161        }
1162        Poll::Ready(Ok(out))
1163    }
1164}
1165
1166//
1167// Alter configs handling
1168//
1169
1170/// The result of an individual AlterConfig operation.
1171pub type AlterConfigsResult =
1172    Result<OwnedResourceSpecifier, (OwnedResourceSpecifier, RDKafkaErrorCode)>;
1173
1174/// Configuration for an AlterConfig operation.
1175pub struct AlterConfig<'a> {
1176    /// Identifies the resource to be altered.
1177    pub specifier: ResourceSpecifier<'a>,
1178    /// The configuration parameters to be updated.
1179    pub entries: HashMap<&'a str, &'a str>,
1180}
1181
1182impl<'a> AlterConfig<'a> {
1183    /// Creates a new `AlterConfig`.
1184    pub fn new(specifier: ResourceSpecifier<'_>) -> AlterConfig<'_> {
1185        AlterConfig {
1186            specifier,
1187            entries: HashMap::new(),
1188        }
1189    }
1190
1191    /// Sets the configuration parameter named `key` to the specified `value`.
1192    pub fn set(mut self, key: &'a str, value: &'a str) -> AlterConfig<'a> {
1193        self.entries.insert(key, value);
1194        self
1195    }
1196
1197    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeConfigResource> {
1198        let (name, typ) = match self.specifier {
1199            ResourceSpecifier::Topic(name) => (
1200                CString::new(name)?,
1201                RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
1202            ),
1203            ResourceSpecifier::Group(name) => (
1204                CString::new(name)?,
1205                RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
1206            ),
1207            ResourceSpecifier::Broker(id) => (
1208                CString::new(format!("{}", id))?,
1209                RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
1210            ),
1211        };
1212        // N.B.: we wrap config immediately, so that it is destroyed via the
1213        // NativeNewTopic's Drop implementation if config installation fails.
1214        let config = unsafe {
1215            NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr()))
1216                .unwrap()
1217        };
1218        for (key, val) in &self.entries {
1219            let key_c = CString::new(*key)?;
1220            let val_c = CString::new(*val)?;
1221            let res = unsafe {
1222                rdsys::rd_kafka_ConfigResource_set_config(
1223                    config.ptr(),
1224                    key_c.as_ptr(),
1225                    val_c.as_ptr(),
1226                )
1227            };
1228            check_rdkafka_invalid_arg(res, err_buf)?;
1229        }
1230        Ok(config)
1231    }
1232}
1233
1234struct AlterConfigsFuture {
1235    rx: oneshot::Receiver<NativeEvent>,
1236}
1237
1238impl Future for AlterConfigsFuture {
1239    type Output = KafkaResult<Vec<AlterConfigsResult>>;
1240
1241    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1242        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1243        event.check_error()?;
1244        let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) };
1245        if res.is_null() {
1246            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1247            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1248                "alter configs request received response of incorrect type ({})",
1249                typ
1250            ))));
1251        }
1252        let mut n = 0;
1253        let resources = unsafe { rdsys::rd_kafka_AlterConfigs_result_resources(res, &mut n) };
1254        let mut out = Vec::with_capacity(n);
1255        for i in 0..n {
1256            let resource = unsafe { *resources.add(i) };
1257            let specifier = extract_config_specifier(resource)?;
1258            out.push(Ok(specifier));
1259        }
1260        Poll::Ready(Ok(out))
1261    }
1262}