1use std::collections::HashMap;
8use std::ffi::{c_void, CStr, CString};
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::thread::{self, JoinHandle};
15use std::time::Duration;
16
17use futures_channel::oneshot;
18use futures_util::future::{self, Either, FutureExt};
19use futures_util::ready;
20
21use rdkafka_sys as rdsys;
22use rdkafka_sys::types::*;
23
24use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
25use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
26use crate::error::{IsError, KafkaError, KafkaResult};
27use crate::log::{trace, warn};
28use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
29
30pub struct AdminClient<C: ClientContext> {
39 client: Client<C>,
40 queue: Arc<NativeQueue>,
41 should_stop: Arc<AtomicBool>,
42 handle: Option<JoinHandle<()>>,
43}
44
45impl<C: ClientContext> AdminClient<C> {
46 pub fn create_topics<'a, I>(
52 &self,
53 topics: I,
54 opts: &AdminOptions,
55 ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
56 where
57 I: IntoIterator<Item = &'a NewTopic<'a>>,
58 {
59 match self.create_topics_inner(topics, opts) {
60 Ok(rx) => Either::Left(CreateTopicsFuture { rx }),
61 Err(err) => Either::Right(future::err(err)),
62 }
63 }
64
65 fn create_topics_inner<'a, I>(
66 &self,
67 topics: I,
68 opts: &AdminOptions,
69 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
70 where
71 I: IntoIterator<Item = &'a NewTopic<'a>>,
72 {
73 let mut native_topics = Vec::new();
74 let mut err_buf = ErrBuf::new();
75 for t in topics {
76 native_topics.push(t.to_native(&mut err_buf)?);
77 }
78 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
79 unsafe {
80 rdsys::rd_kafka_CreateTopics(
81 self.client.native_ptr(),
82 native_topics.as_c_array(),
83 native_topics.len(),
84 native_opts.ptr(),
85 self.queue.ptr(),
86 );
87 }
88 Ok(rx)
89 }
90
91 pub fn delete_topics(
97 &self,
98 topic_names: &[&str],
99 opts: &AdminOptions,
100 ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>> {
101 match self.delete_topics_inner(topic_names, opts) {
102 Ok(rx) => Either::Left(DeleteTopicsFuture { rx }),
103 Err(err) => Either::Right(future::err(err)),
104 }
105 }
106
107 fn delete_topics_inner(
108 &self,
109 topic_names: &[&str],
110 opts: &AdminOptions,
111 ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
112 let mut native_topics = Vec::new();
113 let mut err_buf = ErrBuf::new();
114 for tn in topic_names {
115 let tn_c = CString::new(*tn)?;
116 let native_topic = unsafe {
117 NativeDeleteTopic::from_ptr(rdsys::rd_kafka_DeleteTopic_new(tn_c.as_ptr())).unwrap()
118 };
119 native_topics.push(native_topic);
120 }
121 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
122 unsafe {
123 rdsys::rd_kafka_DeleteTopics(
124 self.client.native_ptr(),
125 native_topics.as_c_array(),
126 native_topics.len(),
127 native_opts.ptr(),
128 self.queue.ptr(),
129 );
130 }
131 Ok(rx)
132 }
133
134 pub fn delete_groups(
136 &self,
137 group_names: &[&str],
138 opts: &AdminOptions,
139 ) -> impl Future<Output = KafkaResult<Vec<GroupResult>>> {
140 match self.delete_groups_inner(group_names, opts) {
141 Ok(rx) => Either::Left(DeleteGroupsFuture { rx }),
142 Err(err) => Either::Right(future::err(err)),
143 }
144 }
145
146 fn delete_groups_inner(
147 &self,
148 group_names: &[&str],
149 opts: &AdminOptions,
150 ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
151 let mut native_groups = Vec::new();
152 let mut err_buf = ErrBuf::new();
153 for gn in group_names {
154 let gn_t = CString::new(*gn)?;
155 let native_group = unsafe {
156 NativeDeleteGroup::from_ptr(rdsys::rd_kafka_DeleteGroup_new(gn_t.as_ptr())).unwrap()
157 };
158 native_groups.push(native_group);
159 }
160 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
161
162 unsafe {
163 rdsys::rd_kafka_DeleteGroups(
164 self.client.native_ptr(),
165 native_groups.as_c_array(),
166 native_groups.len(),
167 native_opts.ptr(),
168 self.queue.ptr(),
169 )
170 }
171 Ok(rx)
172 }
173
174 pub fn create_partitions<'a, I>(
182 &self,
183 partitions: I,
184 opts: &AdminOptions,
185 ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
186 where
187 I: IntoIterator<Item = &'a NewPartitions<'a>>,
188 {
189 match self.create_partitions_inner(partitions, opts) {
190 Ok(rx) => Either::Left(CreatePartitionsFuture { rx }),
191 Err(err) => Either::Right(future::err(err)),
192 }
193 }
194
195 fn create_partitions_inner<'a, I>(
196 &self,
197 partitions: I,
198 opts: &AdminOptions,
199 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
200 where
201 I: IntoIterator<Item = &'a NewPartitions<'a>>,
202 {
203 let mut native_partitions = Vec::new();
204 let mut err_buf = ErrBuf::new();
205 for p in partitions {
206 native_partitions.push(p.to_native(&mut err_buf)?);
207 }
208 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
209 unsafe {
210 rdsys::rd_kafka_CreatePartitions(
211 self.client.native_ptr(),
212 native_partitions.as_c_array(),
213 native_partitions.len(),
214 native_opts.ptr(),
215 self.queue.ptr(),
216 );
217 }
218 Ok(rx)
219 }
220
221 pub fn describe_configs<'a, I>(
227 &self,
228 configs: I,
229 opts: &AdminOptions,
230 ) -> impl Future<Output = KafkaResult<Vec<ConfigResourceResult>>>
231 where
232 I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
233 {
234 match self.describe_configs_inner(configs, opts) {
235 Ok(rx) => Either::Left(DescribeConfigsFuture { rx }),
236 Err(err) => Either::Right(future::err(err)),
237 }
238 }
239
240 fn describe_configs_inner<'a, I>(
241 &self,
242 configs: I,
243 opts: &AdminOptions,
244 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
245 where
246 I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
247 {
248 let mut native_configs = Vec::new();
249 let mut err_buf = ErrBuf::new();
250 for c in configs {
251 let (name, typ) = match c {
252 ResourceSpecifier::Topic(name) => (
253 CString::new(*name)?,
254 RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
255 ),
256 ResourceSpecifier::Group(name) => (
257 CString::new(*name)?,
258 RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
259 ),
260 ResourceSpecifier::Broker(id) => (
261 CString::new(format!("{}", id))?,
262 RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
263 ),
264 };
265 native_configs.push(unsafe {
266 NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(
267 typ,
268 name.as_ptr(),
269 ))
270 .unwrap()
271 });
272 }
273 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
274 unsafe {
275 rdsys::rd_kafka_DescribeConfigs(
276 self.client.native_ptr(),
277 native_configs.as_c_array(),
278 native_configs.len(),
279 native_opts.ptr(),
280 self.queue.ptr(),
281 );
282 }
283 Ok(rx)
284 }
285
286 pub fn alter_configs<'a, I>(
292 &self,
293 configs: I,
294 opts: &AdminOptions,
295 ) -> impl Future<Output = KafkaResult<Vec<AlterConfigsResult>>>
296 where
297 I: IntoIterator<Item = &'a AlterConfig<'a>>,
298 {
299 match self.alter_configs_inner(configs, opts) {
300 Ok(rx) => Either::Left(AlterConfigsFuture { rx }),
301 Err(err) => Either::Right(future::err(err)),
302 }
303 }
304
305 fn alter_configs_inner<'a, I>(
306 &self,
307 configs: I,
308 opts: &AdminOptions,
309 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
310 where
311 I: IntoIterator<Item = &'a AlterConfig<'a>>,
312 {
313 let mut native_configs = Vec::new();
314 let mut err_buf = ErrBuf::new();
315 for c in configs {
316 native_configs.push(c.to_native(&mut err_buf)?);
317 }
318 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
319 unsafe {
320 rdsys::rd_kafka_AlterConfigs(
321 self.client.native_ptr(),
322 native_configs.as_c_array(),
323 native_configs.len(),
324 native_opts.ptr(),
325 self.queue.ptr(),
326 );
327 }
328 Ok(rx)
329 }
330
331 pub fn inner(&self) -> &Client<C> {
333 &self.client
334 }
335}
336
337#[async_trait::async_trait]
338impl FromClientConfig for AdminClient<DefaultClientContext> {
339 async fn from_config(config: &ClientConfig) -> KafkaResult<AdminClient<DefaultClientContext>> {
340 AdminClient::from_config_and_context(config, DefaultClientContext).await
341 }
342}
343
344#[async_trait::async_trait]
345impl<C: ClientContext> FromClientConfigAndContext<C> for AdminClient<C> {
346 async fn from_config_and_context(
347 config: &ClientConfig,
348 context: C,
349 ) -> KafkaResult<AdminClient<C>> {
350 let native_config = config.create_native_config()?;
351 let client = Client::new(
357 config,
358 native_config,
359 RDKafkaType::RD_KAFKA_PRODUCER,
360 context,
361 )?;
362 let queue = Arc::new(client.new_native_queue());
363 let should_stop = Arc::new(AtomicBool::new(false));
364 let handle = start_poll_thread(queue.clone(), should_stop.clone());
365 Ok(AdminClient {
366 client,
367 queue,
368 should_stop,
369 handle: Some(handle),
370 })
371 }
372}
373
374impl<C: ClientContext> Drop for AdminClient<C> {
375 fn drop(&mut self) {
376 trace!("Stopping polling");
377 self.should_stop.store(true, Ordering::Relaxed);
378 trace!("Waiting for polling thread termination");
379 match self.handle.take().unwrap().join() {
380 Ok(()) => trace!("Polling stopped"),
381 Err(e) => warn!("Failure while terminating thread: {:?}", e),
382 };
383 }
384}
385
386fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> JoinHandle<()> {
387 thread::Builder::new()
388 .name("admin client polling thread".into())
389 .spawn(move || {
390 trace!("Admin polling thread loop started");
391 loop {
392 let event = queue.poll(Duration::from_millis(100));
393 if event.is_null() {
394 if should_stop.load(Ordering::Relaxed) {
395 break;
398 }
399 continue;
400 }
401 let event = unsafe { NativeEvent::from_ptr(event).unwrap() };
402 let tx: Box<oneshot::Sender<NativeEvent>> =
403 unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) };
404 let _ = tx.send(event);
405 }
406 trace!("Admin polling thread loop terminated");
407 })
408 .expect("Failed to start polling thread")
409}
410
411type NativeEvent = NativePtr<RDKafkaEvent>;
412
413unsafe impl KafkaDrop for RDKafkaEvent {
414 const TYPE: &'static str = "event";
415 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_event_destroy;
416}
417
418unsafe impl Send for NativeEvent {}
419unsafe impl Sync for NativeEvent {}
420
421impl NativePtr<RDKafkaEvent> {
422 fn check_error(&self) -> KafkaResult<()> {
423 let err = unsafe { rdsys::rd_kafka_event_error(self.ptr()) };
424 if err.is_error() {
425 Err(KafkaError::AdminOp(err.into()))
426 } else {
427 Ok(())
428 }
429 }
430}
431
432#[derive(Default)]
438pub struct AdminOptions {
439 request_timeout: Option<Timeout>,
440 operation_timeout: Option<Timeout>,
441 validate_only: bool,
442 broker_id: Option<i32>,
443}
444
445impl AdminOptions {
446 pub fn new() -> AdminOptions {
448 AdminOptions::default()
449 }
450
451 pub fn request_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
456 self.request_timeout = timeout.map(Into::into);
457 self
458 }
459
460 pub fn operation_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
470 self.operation_timeout = timeout.map(Into::into);
471 self
472 }
473
474 pub fn validate_only(mut self, validate_only: bool) -> Self {
479 self.validate_only = validate_only;
480 self
481 }
482
483 pub fn broker_id<T: Into<Option<i32>>>(mut self, broker_id: T) -> Self {
488 self.broker_id = broker_id.into();
489 self
490 }
491
492 fn to_native(
493 &self,
494 client: *mut RDKafka,
495 err_buf: &mut ErrBuf,
496 ) -> KafkaResult<(NativeAdminOptions, oneshot::Receiver<NativeEvent>)> {
497 let native_opts = unsafe {
498 NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new(
499 client,
500 RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY,
501 ))
502 .unwrap()
503 };
504
505 if let Some(timeout) = self.request_timeout {
506 let res = unsafe {
507 rdsys::rd_kafka_AdminOptions_set_request_timeout(
508 native_opts.ptr(),
509 timeout.as_millis(),
510 err_buf.as_mut_ptr(),
511 err_buf.capacity(),
512 )
513 };
514 check_rdkafka_invalid_arg(res, err_buf)?;
515 }
516
517 if let Some(timeout) = self.operation_timeout {
518 let res = unsafe {
519 rdsys::rd_kafka_AdminOptions_set_operation_timeout(
520 native_opts.ptr(),
521 timeout.as_millis(),
522 err_buf.as_mut_ptr(),
523 err_buf.capacity(),
524 )
525 };
526 check_rdkafka_invalid_arg(res, err_buf)?;
527 }
528
529 if self.validate_only {
530 let res = unsafe {
531 rdsys::rd_kafka_AdminOptions_set_validate_only(
532 native_opts.ptr(),
533 1, err_buf.as_mut_ptr(),
535 err_buf.capacity(),
536 )
537 };
538 check_rdkafka_invalid_arg(res, err_buf)?;
539 }
540
541 if let Some(broker_id) = self.broker_id {
542 let res = unsafe {
543 rdsys::rd_kafka_AdminOptions_set_broker(
544 native_opts.ptr(),
545 broker_id,
546 err_buf.as_mut_ptr(),
547 err_buf.capacity(),
548 )
549 };
550 check_rdkafka_invalid_arg(res, err_buf)?;
551 }
552
553 let (tx, rx) = oneshot::channel();
554 let tx = Box::into_raw(Box::new(tx)) as *mut c_void;
555 unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr(), tx) };
556
557 Ok((native_opts, rx))
558 }
559}
560
561unsafe impl KafkaDrop for RDKafkaAdminOptions {
562 const TYPE: &'static str = "admin options";
563 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_AdminOptions_destroy;
564}
565
566type NativeAdminOptions = NativePtr<RDKafkaAdminOptions>;
567
568fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResult<()> {
569 match res.into() {
570 RDKafkaErrorCode::NoError => Ok(()),
571 RDKafkaErrorCode::InvalidArgument => {
572 let msg = if err_buf.len() == 0 {
573 "invalid argument".into()
574 } else {
575 err_buf.to_string()
576 };
577 Err(KafkaError::AdminOpCreation(msg))
578 }
579 res => Err(KafkaError::AdminOpCreation(format!(
580 "setting admin options returned unexpected error code {}",
581 res
582 ))),
583 }
584}
585
586pub type TopicResult = Result<String, (String, RDKafkaErrorCode)>;
593
594fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Vec<TopicResult> {
595 let mut out = Vec::with_capacity(n);
596 for i in 0..n {
597 let topic = unsafe { *topics.add(i) };
598 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_topic_result_name(topic)) };
599 let err = unsafe { rdsys::rd_kafka_topic_result_error(topic) };
600 if err.is_error() {
601 out.push(Err((name, err.into())));
602 } else {
603 out.push(Ok(name));
604 }
605 }
606 out
607}
608
609pub type GroupResult = Result<String, (String, RDKafkaErrorCode)>;
611
612fn build_group_results(groups: *const *const RDKafkaGroupResult, n: usize) -> Vec<GroupResult> {
613 let mut out = Vec::with_capacity(n);
614 for i in 0..n {
615 let group = unsafe { *groups.add(i) };
616 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_group_result_name(group)) };
617 let err = unsafe {
618 let err = rdsys::rd_kafka_group_result_error(group);
619 rdsys::rd_kafka_error_code(err)
620 };
621 if err.is_error() {
622 out.push(Err((name, err.into())));
623 } else {
624 out.push(Ok(name));
625 }
626 }
627 out
628}
629
630#[derive(Debug)]
636pub struct NewTopic<'a> {
637 pub name: &'a str,
639 pub num_partitions: i32,
641 pub replication: TopicReplication<'a>,
643 pub config: Vec<(&'a str, &'a str)>,
645}
646
647impl<'a> NewTopic<'a> {
648 pub fn new(
650 name: &'a str,
651 num_partitions: i32,
652 replication: TopicReplication<'a>,
653 ) -> NewTopic<'a> {
654 NewTopic {
655 name,
656 num_partitions,
657 replication,
658 config: Vec::new(),
659 }
660 }
661
662 pub fn set(mut self, key: &'a str, value: &'a str) -> NewTopic<'a> {
664 self.config.push((key, value));
665 self
666 }
667
668 fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewTopic> {
669 let name = CString::new(self.name)?;
670 let repl = match self.replication {
671 TopicReplication::Fixed(n) => n,
672 TopicReplication::Variable(partitions) => {
673 if partitions.len() as i32 != self.num_partitions {
674 return Err(KafkaError::AdminOpCreation(format!(
675 "replication configuration for topic '{}' assigns {} partition(s), \
676 which does not match the specified number of partitions ({})",
677 self.name,
678 partitions.len(),
679 self.num_partitions,
680 )));
681 }
682 -1
683 }
684 };
685 let topic = unsafe {
689 NativeNewTopic::from_ptr(rdsys::rd_kafka_NewTopic_new(
690 name.as_ptr(),
691 self.num_partitions,
692 repl,
693 err_buf.as_mut_ptr(),
694 err_buf.capacity(),
695 ))
696 }
697 .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
698
699 if let TopicReplication::Variable(assignment) = self.replication {
700 for (partition_id, broker_ids) in assignment.iter().enumerate() {
701 let res = unsafe {
702 rdsys::rd_kafka_NewTopic_set_replica_assignment(
703 topic.ptr(),
704 partition_id as i32,
705 broker_ids.as_ptr() as *mut i32,
706 broker_ids.len(),
707 err_buf.as_mut_ptr(),
708 err_buf.capacity(),
709 )
710 };
711 check_rdkafka_invalid_arg(res, err_buf)?;
712 }
713 }
714 for (key, val) in &self.config {
715 let key_c = CString::new(*key)?;
716 let val_c = CString::new(*val)?;
717 let res = unsafe {
718 rdsys::rd_kafka_NewTopic_set_config(topic.ptr(), key_c.as_ptr(), val_c.as_ptr())
719 };
720 check_rdkafka_invalid_arg(res, err_buf)?;
721 }
722 Ok(topic)
723 }
724}
725
726pub type PartitionAssignment<'a> = &'a [&'a [i32]];
732
733#[derive(Debug)]
735pub enum TopicReplication<'a> {
736 Fixed(i32),
738 Variable(PartitionAssignment<'a>),
741}
742
743type NativeNewTopic = NativePtr<RDKafkaNewTopic>;
744
745unsafe impl KafkaDrop for RDKafkaNewTopic {
746 const TYPE: &'static str = "new topic";
747 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewTopic_destroy;
748}
749
750struct CreateTopicsFuture {
751 rx: oneshot::Receiver<NativeEvent>,
752}
753
754impl Future for CreateTopicsFuture {
755 type Output = KafkaResult<Vec<TopicResult>>;
756
757 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
758 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
759 event.check_error()?;
760 let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) };
761 if res.is_null() {
762 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
763 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
764 "create topics request received response of incorrect type ({})",
765 typ
766 ))));
767 }
768 let mut n = 0;
769 let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) };
770 Poll::Ready(Ok(build_topic_results(topics, n)))
771 }
772}
773
774type NativeDeleteTopic = NativePtr<RDKafkaDeleteTopic>;
779
780unsafe impl KafkaDrop for RDKafkaDeleteTopic {
781 const TYPE: &'static str = "delete topic";
782 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteTopic_destroy;
783}
784
785struct DeleteTopicsFuture {
786 rx: oneshot::Receiver<NativeEvent>,
787}
788
789impl Future for DeleteTopicsFuture {
790 type Output = KafkaResult<Vec<TopicResult>>;
791
792 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
793 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
794 event.check_error()?;
795 let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) };
796 if res.is_null() {
797 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
798 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
799 "delete topics request received response of incorrect type ({})",
800 typ
801 ))));
802 }
803 let mut n = 0;
804 let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) };
805 Poll::Ready(Ok(build_topic_results(topics, n)))
806 }
807}
808
809type NativeDeleteGroup = NativePtr<RDKafkaDeleteGroup>;
814
815unsafe impl KafkaDrop for RDKafkaDeleteGroup {
816 const TYPE: &'static str = "delete group";
817 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteGroup_destroy;
818}
819
820struct DeleteGroupsFuture {
821 rx: oneshot::Receiver<NativeEvent>,
822}
823
824impl Future for DeleteGroupsFuture {
825 type Output = KafkaResult<Vec<GroupResult>>;
826
827 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
828 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
829 event.check_error()?;
830 let res = unsafe { rdsys::rd_kafka_event_DeleteGroups_result(event.ptr()) };
831 if res.is_null() {
832 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
833 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
834 "delete groups request received response of incorrect type ({})",
835 typ
836 ))));
837 }
838 let mut n = 0;
839 let groups = unsafe { rdsys::rd_kafka_DeleteGroups_result_groups(res, &mut n) };
840 Poll::Ready(Ok(build_group_results(groups, n)))
841 }
842}
843
844pub struct NewPartitions<'a> {
850 pub topic_name: &'a str,
852 pub new_partition_count: usize,
854 pub assignment: Option<PartitionAssignment<'a>>,
856}
857
858impl<'a> NewPartitions<'a> {
859 pub fn new(topic_name: &'a str, new_partition_count: usize) -> NewPartitions<'a> {
861 NewPartitions {
862 topic_name,
863 new_partition_count,
864 assignment: None,
865 }
866 }
867
868 pub fn assign(mut self, assignment: PartitionAssignment<'a>) -> NewPartitions<'a> {
871 self.assignment = Some(assignment);
872 self
873 }
874
875 fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewPartitions> {
876 let name = CString::new(self.topic_name)?;
877 if let Some(assignment) = self.assignment {
878 if assignment.len() > self.new_partition_count {
886 return Err(KafkaError::AdminOpCreation(format!(
887 "partition assignment for topic '{}' assigns {} partition(s), \
888 which is more than the requested total number of partitions ({})",
889 self.topic_name,
890 assignment.len(),
891 self.new_partition_count,
892 )));
893 }
894 }
895 let partitions = unsafe {
899 NativeNewPartitions::from_ptr(rdsys::rd_kafka_NewPartitions_new(
900 name.as_ptr(),
901 self.new_partition_count,
902 err_buf.as_mut_ptr(),
903 err_buf.capacity(),
904 ))
905 }
906 .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
907
908 if let Some(assignment) = self.assignment {
909 for (partition_id, broker_ids) in assignment.iter().enumerate() {
910 let res = unsafe {
911 rdsys::rd_kafka_NewPartitions_set_replica_assignment(
912 partitions.ptr(),
913 partition_id as i32,
914 broker_ids.as_ptr() as *mut i32,
915 broker_ids.len(),
916 err_buf.as_mut_ptr(),
917 err_buf.capacity(),
918 )
919 };
920 check_rdkafka_invalid_arg(res, err_buf)?;
921 }
922 }
923 Ok(partitions)
924 }
925}
926
927type NativeNewPartitions = NativePtr<RDKafkaNewPartitions>;
928
929unsafe impl KafkaDrop for RDKafkaNewPartitions {
930 const TYPE: &'static str = "new partitions";
931 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewPartitions_destroy;
932}
933
934struct CreatePartitionsFuture {
935 rx: oneshot::Receiver<NativeEvent>,
936}
937
938impl Future for CreatePartitionsFuture {
939 type Output = KafkaResult<Vec<TopicResult>>;
940
941 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
942 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
943 event.check_error()?;
944 let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) };
945 if res.is_null() {
946 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
947 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
948 "create partitions request received response of incorrect type ({})",
949 typ
950 ))));
951 }
952 let mut n = 0;
953 let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) };
954 Poll::Ready(Ok(build_topic_results(topics, n)))
955 }
956}
957
958pub type ConfigResourceResult = Result<ConfigResource, RDKafkaErrorCode>;
964
965#[derive(Copy, Clone, Debug, Eq, PartialEq)]
967pub enum ResourceSpecifier<'a> {
968 Topic(&'a str),
970 Group(&'a str),
972 Broker(i32),
974}
975
976#[derive(Debug, Eq, PartialEq)]
978pub enum OwnedResourceSpecifier {
979 Topic(String),
981 Group(String),
983 Broker(i32),
985}
986
987#[derive(Debug, Eq, PartialEq)]
989pub enum ConfigSource {
990 Unknown,
993 DynamicTopic,
995 DynamicBroker,
997 DynamicDefaultBroker,
999 StaticBroker,
1001 Default,
1003}
1004
1005#[derive(Debug, Eq, PartialEq)]
1007pub struct ConfigEntry {
1008 pub name: String,
1010 pub value: Option<String>,
1012 pub source: ConfigSource,
1014 pub is_read_only: bool,
1016 pub is_default: bool,
1018 pub is_sensitive: bool,
1020}
1021
1022#[derive(Debug)]
1024pub struct ConfigResource {
1025 pub specifier: OwnedResourceSpecifier,
1027 pub entries: Vec<ConfigEntry>,
1029}
1030
1031impl ConfigResource {
1032 pub fn entry_map(&self) -> HashMap<&str, &ConfigEntry> {
1035 self.entries.iter().map(|e| (&*e.name, e)).collect()
1036 }
1037
1038 pub fn get(&self, name: &str) -> Option<&ConfigEntry> {
1043 self.entries.iter().find(|e| e.name == name)
1044 }
1045}
1046
1047type NativeConfigResource = NativePtr<RDKafkaConfigResource>;
1048
1049unsafe impl KafkaDrop for RDKafkaConfigResource {
1050 const TYPE: &'static str = "config resource";
1051 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_ConfigResource_destroy;
1052}
1053
1054fn extract_config_specifier(
1055 resource: *const RDKafkaConfigResource,
1056) -> KafkaResult<OwnedResourceSpecifier> {
1057 let typ = unsafe { rdsys::rd_kafka_ConfigResource_type(resource) };
1058 match typ {
1059 RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC => {
1060 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1061 Ok(OwnedResourceSpecifier::Topic(name))
1062 }
1063 RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP => {
1064 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1065 Ok(OwnedResourceSpecifier::Group(name))
1066 }
1067 RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER => {
1068 let name = unsafe { CStr::from_ptr(rdsys::rd_kafka_ConfigResource_name(resource)) }
1069 .to_string_lossy();
1070 match name.parse::<i32>() {
1071 Ok(id) => Ok(OwnedResourceSpecifier::Broker(id)),
1072 Err(_) => Err(KafkaError::AdminOpCreation(format!(
1073 "bogus broker ID in kafka response: {}",
1074 name
1075 ))),
1076 }
1077 }
1078 _ => Err(KafkaError::AdminOpCreation(format!(
1079 "bogus resource type in kafka response: {:?}",
1080 typ
1081 ))),
1082 }
1083}
1084
1085fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult<ConfigSource> {
1086 match config_source {
1087 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG => Ok(ConfigSource::Unknown),
1088 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG => {
1089 Ok(ConfigSource::DynamicTopic)
1090 }
1091 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG => {
1092 Ok(ConfigSource::DynamicBroker)
1093 }
1094 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG => {
1095 Ok(ConfigSource::DynamicDefaultBroker)
1096 }
1097 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG => {
1098 Ok(ConfigSource::StaticBroker)
1099 }
1100 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG => Ok(ConfigSource::Default),
1101 _ => Err(KafkaError::AdminOpCreation(format!(
1102 "bogus config source type in kafka response: {:?}",
1103 config_source,
1104 ))),
1105 }
1106}
1107
1108struct DescribeConfigsFuture {
1109 rx: oneshot::Receiver<NativeEvent>,
1110}
1111
1112impl Future for DescribeConfigsFuture {
1113 type Output = KafkaResult<Vec<ConfigResourceResult>>;
1114
1115 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1116 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1117 event.check_error()?;
1118 let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) };
1119 if res.is_null() {
1120 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1121 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1122 "describe configs request received response of incorrect type ({})",
1123 typ
1124 ))));
1125 }
1126 let mut n = 0;
1127 let resources = unsafe { rdsys::rd_kafka_DescribeConfigs_result_resources(res, &mut n) };
1128 let mut out = Vec::with_capacity(n);
1129 for i in 0..n {
1130 let resource = unsafe { *resources.add(i) };
1131 let specifier = extract_config_specifier(resource)?;
1132 let mut entries_out = Vec::new();
1133 let mut n = 0;
1134 let entries = unsafe { rdsys::rd_kafka_ConfigResource_configs(resource, &mut n) };
1135 for j in 0..n {
1136 let entry = unsafe { *entries.add(j) };
1137 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigEntry_name(entry)) };
1138 let value = unsafe {
1139 let value = rdsys::rd_kafka_ConfigEntry_value(entry);
1140 if value.is_null() {
1141 None
1142 } else {
1143 Some(cstr_to_owned(value))
1144 }
1145 };
1146 entries_out.push(ConfigEntry {
1147 name,
1148 value,
1149 source: extract_config_source(unsafe {
1150 rdsys::rd_kafka_ConfigEntry_source(entry)
1151 })?,
1152 is_read_only: unsafe { rdsys::rd_kafka_ConfigEntry_is_read_only(entry) } != 0,
1153 is_default: unsafe { rdsys::rd_kafka_ConfigEntry_is_default(entry) } != 0,
1154 is_sensitive: unsafe { rdsys::rd_kafka_ConfigEntry_is_sensitive(entry) } != 0,
1155 });
1156 }
1157 out.push(Ok(ConfigResource {
1158 specifier,
1159 entries: entries_out,
1160 }))
1161 }
1162 Poll::Ready(Ok(out))
1163 }
1164}
1165
1166pub type AlterConfigsResult =
1172 Result<OwnedResourceSpecifier, (OwnedResourceSpecifier, RDKafkaErrorCode)>;
1173
1174pub struct AlterConfig<'a> {
1176 pub specifier: ResourceSpecifier<'a>,
1178 pub entries: HashMap<&'a str, &'a str>,
1180}
1181
1182impl<'a> AlterConfig<'a> {
1183 pub fn new(specifier: ResourceSpecifier<'_>) -> AlterConfig<'_> {
1185 AlterConfig {
1186 specifier,
1187 entries: HashMap::new(),
1188 }
1189 }
1190
1191 pub fn set(mut self, key: &'a str, value: &'a str) -> AlterConfig<'a> {
1193 self.entries.insert(key, value);
1194 self
1195 }
1196
1197 fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeConfigResource> {
1198 let (name, typ) = match self.specifier {
1199 ResourceSpecifier::Topic(name) => (
1200 CString::new(name)?,
1201 RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
1202 ),
1203 ResourceSpecifier::Group(name) => (
1204 CString::new(name)?,
1205 RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
1206 ),
1207 ResourceSpecifier::Broker(id) => (
1208 CString::new(format!("{}", id))?,
1209 RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
1210 ),
1211 };
1212 let config = unsafe {
1215 NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr()))
1216 .unwrap()
1217 };
1218 for (key, val) in &self.entries {
1219 let key_c = CString::new(*key)?;
1220 let val_c = CString::new(*val)?;
1221 let res = unsafe {
1222 rdsys::rd_kafka_ConfigResource_set_config(
1223 config.ptr(),
1224 key_c.as_ptr(),
1225 val_c.as_ptr(),
1226 )
1227 };
1228 check_rdkafka_invalid_arg(res, err_buf)?;
1229 }
1230 Ok(config)
1231 }
1232}
1233
1234struct AlterConfigsFuture {
1235 rx: oneshot::Receiver<NativeEvent>,
1236}
1237
1238impl Future for AlterConfigsFuture {
1239 type Output = KafkaResult<Vec<AlterConfigsResult>>;
1240
1241 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1242 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1243 event.check_error()?;
1244 let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) };
1245 if res.is_null() {
1246 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1247 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1248 "alter configs request received response of incorrect type ({})",
1249 typ
1250 ))));
1251 }
1252 let mut n = 0;
1253 let resources = unsafe { rdsys::rd_kafka_AlterConfigs_result_resources(res, &mut n) };
1254 let mut out = Vec::with_capacity(n);
1255 for i in 0..n {
1256 let resource = unsafe { *resources.add(i) };
1257 let specifier = extract_config_specifier(resource)?;
1258 out.push(Ok(specifier));
1259 }
1260 Poll::Ready(Ok(out))
1261 }
1262}