1use std::collections::HashMap;
6use std::ffi::{CStr, CString};
7use std::fmt;
8use std::slice;
9use std::str;
10
11use libc::c_void;
12use rdkafka_sys as rdsys;
13use rdkafka_sys::types::*;
14
15use crate::error::{IsError, KafkaError, KafkaResult};
16use crate::util::{self, KafkaDrop, NativePtr};
17
18const PARTITION_UNASSIGNED: i32 = -1;
19
20const OFFSET_BEGINNING: i64 = rdsys::RD_KAFKA_OFFSET_BEGINNING as i64;
21const OFFSET_END: i64 = rdsys::RD_KAFKA_OFFSET_END as i64;
22const OFFSET_STORED: i64 = rdsys::RD_KAFKA_OFFSET_STORED as i64;
23const OFFSET_INVALID: i64 = rdsys::RD_KAFKA_OFFSET_INVALID as i64;
24const OFFSET_TAIL_BASE: i64 = rdsys::RD_KAFKA_OFFSET_TAIL_BASE as i64;
25
26#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Offset {
29 Beginning,
31 End,
33 Stored,
35 Invalid,
37 Offset(i64),
42 OffsetTail(i64),
47}
48
49impl Offset {
50 pub fn from_raw(raw_offset: i64) -> Offset {
53 match raw_offset {
54 OFFSET_BEGINNING => Offset::Beginning,
55 OFFSET_END => Offset::End,
56 OFFSET_STORED => Offset::Stored,
57 OFFSET_INVALID => Offset::Invalid,
58 n if n <= OFFSET_TAIL_BASE => Offset::OffsetTail(-(n - OFFSET_TAIL_BASE)),
59 n => Offset::Offset(n),
60 }
61 }
62
63 pub fn to_raw(self) -> Option<i64> {
69 match self {
70 Offset::Beginning => Some(OFFSET_BEGINNING),
71 Offset::End => Some(OFFSET_END),
72 Offset::Stored => Some(OFFSET_STORED),
73 Offset::Invalid => Some(OFFSET_INVALID),
74 Offset::Offset(n) if n >= 0 => Some(n),
75 Offset::OffsetTail(n) if n > 0 => Some(OFFSET_TAIL_BASE - n),
76 Offset::Offset(_) | Offset::OffsetTail(_) => None,
77 }
78 }
79}
80
81pub struct TopicPartitionListElem<'a> {
84 ptr: &'a mut RDKafkaTopicPartition,
85}
86
87unsafe impl Send for TopicPartitionListElem<'_> {}
88unsafe impl Sync for TopicPartitionListElem<'_> {}
89
90impl<'a> TopicPartitionListElem<'a> {
91 fn from_ptr(
93 _owner_list: &'a TopicPartitionList,
94 ptr: &'a mut RDKafkaTopicPartition,
95 ) -> TopicPartitionListElem<'a> {
96 TopicPartitionListElem { ptr }
97 }
98
99 pub fn topic(&self) -> &str {
101 unsafe {
102 let c_str = self.ptr.topic;
103 CStr::from_ptr(c_str)
104 .to_str()
105 .expect("Topic name is not UTF-8")
106 }
107 }
108
109 pub fn error(&self) -> KafkaResult<()> {
111 let kafka_err = self.ptr.err;
112 if kafka_err.is_error() {
113 Err(KafkaError::OffsetFetch(kafka_err.into()))
114 } else {
115 Ok(())
116 }
117 }
118
119 pub fn partition(&self) -> i32 {
121 self.ptr.partition
122 }
123
124 pub fn offset(&self) -> Offset {
126 let raw_offset = self.ptr.offset;
127 Offset::from_raw(raw_offset)
128 }
129
130 pub fn set_offset(&mut self, offset: Offset) -> KafkaResult<()> {
132 match offset.to_raw() {
133 Some(offset) => {
134 self.ptr.offset = offset;
135 Ok(())
136 }
137 None => Err(KafkaError::SetPartitionOffset(
138 RDKafkaErrorCode::InvalidArgument,
139 )),
140 }
141 }
142
143 pub fn metadata(&self) -> &str {
145 let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) };
146 str::from_utf8(bytes).expect("Metadata is not UTF-8")
147 }
148
149 pub fn set_metadata<M>(&mut self, metadata: M)
151 where
152 M: AsRef<str>,
153 {
154 let metadata = metadata.as_ref();
155 let buf = unsafe { libc::malloc(metadata.len()) };
156 unsafe { libc::memcpy(buf, metadata.as_ptr() as *const c_void, metadata.len()) };
157 self.ptr.metadata = buf;
158 self.ptr.metadata_size = metadata.len();
159 }
160}
161
162impl<'a> PartialEq for TopicPartitionListElem<'a> {
163 fn eq(&self, other: &TopicPartitionListElem<'a>) -> bool {
164 self.topic() == other.topic()
165 && self.partition() == other.partition()
166 && self.offset() == other.offset()
167 && self.metadata() == other.metadata()
168 }
169}
170
171pub struct TopicPartitionList {
173 ptr: NativePtr<RDKafkaTopicPartitionList>,
174}
175
176unsafe impl KafkaDrop for RDKafkaTopicPartitionList {
177 const TYPE: &'static str = "topic partition list";
178 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_topic_partition_list_destroy;
179}
180
181impl Clone for TopicPartitionList {
182 fn clone(&self) -> Self {
183 let new_tpl = unsafe { rdsys::rd_kafka_topic_partition_list_copy(self.ptr()) };
184 unsafe { TopicPartitionList::from_ptr(new_tpl) }
185 }
186}
187
188impl TopicPartitionList {
189 pub fn new() -> TopicPartitionList {
191 TopicPartitionList::with_capacity(5)
192 }
193
194 pub fn with_capacity(capacity: usize) -> TopicPartitionList {
196 let ptr = unsafe { rdsys::rd_kafka_topic_partition_list_new(capacity as i32) };
197 unsafe { TopicPartitionList::from_ptr(ptr) }
198 }
199
200 pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaTopicPartitionList) -> TopicPartitionList {
203 TopicPartitionList {
204 ptr: NativePtr::from_ptr(ptr).unwrap(),
205 }
206 }
207
208 pub fn from_topic_map(
210 topic_map: &HashMap<(String, i32), Offset>,
211 ) -> KafkaResult<TopicPartitionList> {
212 let mut tpl = TopicPartitionList::with_capacity(topic_map.len());
213 for ((topic_name, partition), offset) in topic_map {
214 tpl.add_partition_offset(topic_name, *partition, *offset)?;
215 }
216 Ok(tpl)
217 }
218
219 pub fn ptr(&self) -> *mut RDKafkaTopicPartitionList {
221 self.ptr.ptr()
222 }
223
224 pub fn count(&self) -> usize {
226 self.ptr.cnt as usize
227 }
228
229 pub fn capacity(&self) -> usize {
231 self.ptr.size as usize
232 }
233
234 pub fn add_topic_unassigned<'a>(&'a mut self, topic: &str) -> TopicPartitionListElem<'a> {
236 self.add_partition(topic, PARTITION_UNASSIGNED)
237 }
238
239 pub fn add_partition<'a>(
241 &'a mut self,
242 topic: &str,
243 partition: i32,
244 ) -> TopicPartitionListElem<'a> {
245 let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
246 let tp_ptr = unsafe {
247 rdsys::rd_kafka_topic_partition_list_add(self.ptr(), topic_c.as_ptr(), partition)
248 };
249 unsafe { TopicPartitionListElem::from_ptr(self, &mut *tp_ptr) }
250 }
251
252 pub fn add_partition_range(&mut self, topic: &str, start_partition: i32, stop_partition: i32) {
254 let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
255 unsafe {
256 rdsys::rd_kafka_topic_partition_list_add_range(
257 self.ptr(),
258 topic_c.as_ptr(),
259 start_partition,
260 stop_partition,
261 );
262 }
263 }
264
265 pub fn set_partition_offset(
268 &mut self,
269 topic: &str,
270 partition: i32,
271 offset: Offset,
272 ) -> KafkaResult<()> {
273 let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
274 let kafka_err = match offset.to_raw() {
275 Some(offset) => unsafe {
276 rdsys::rd_kafka_topic_partition_list_set_offset(
277 self.ptr(),
278 topic_c.as_ptr(),
279 partition,
280 offset,
281 )
282 },
283 None => RDKafkaRespErr::RD_KAFKA_RESP_ERR__INVALID_ARG,
284 };
285
286 if kafka_err.is_error() {
287 Err(KafkaError::SetPartitionOffset(kafka_err.into()))
288 } else {
289 Ok(())
290 }
291 }
292
293 pub fn add_partition_offset(
295 &mut self,
296 topic: &str,
297 partition: i32,
298 offset: Offset,
299 ) -> KafkaResult<()> {
300 self.add_partition(topic, partition);
301 self.set_partition_offset(topic, partition, offset)
302 }
303
304 pub fn find_partition(
306 &self,
307 topic: &str,
308 partition: i32,
309 ) -> Option<TopicPartitionListElem<'_>> {
310 let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
311 let elem_ptr = unsafe {
312 rdsys::rd_kafka_topic_partition_list_find(self.ptr(), topic_c.as_ptr(), partition)
313 };
314 if elem_ptr.is_null() {
315 None
316 } else {
317 Some(unsafe { TopicPartitionListElem::from_ptr(self, &mut *elem_ptr) })
318 }
319 }
320
321 pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
323 let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
324 for elem_ptr in slice {
325 let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
326 elem.set_offset(offset)?;
327 }
328 Ok(())
329 }
330
331 pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
333 let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
334 let mut vec = Vec::with_capacity(slice.len());
335 for elem_ptr in slice {
336 vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
337 }
338 vec
339 }
340
341 pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
343 let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
344 let mut vec = Vec::with_capacity(slice.len());
345 for elem_ptr in slice {
346 let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
347 if tp.topic() == topic {
348 vec.push(tp);
349 }
350 }
351 vec
352 }
353
354 pub fn to_topic_map(&self) -> HashMap<(String, i32), Offset> {
356 self.elements()
357 .iter()
358 .map(|elem| ((elem.topic().to_owned(), elem.partition()), elem.offset()))
359 .collect()
360 }
361}
362
363impl PartialEq for TopicPartitionList {
364 fn eq(&self, other: &TopicPartitionList) -> bool {
365 if self.count() != other.count() {
366 return false;
367 }
368 self.elements().iter().all(|elem| {
369 if let Some(other_elem) = other.find_partition(elem.topic(), elem.partition()) {
370 elem == &other_elem
371 } else {
372 false
373 }
374 })
375 }
376}
377
378impl Default for TopicPartitionList {
379 fn default() -> Self {
380 Self::new()
381 }
382}
383
384impl fmt::Debug for TopicPartitionList {
385 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
386 write!(f, "TPL {{")?;
387 for (i, elem) in self.elements().iter().enumerate() {
388 if i > 0 {
389 write!(f, "; ")?;
390 }
391 write!(
392 f,
393 "{}/{}: offset={:?} metadata={:?}, error={:?}",
394 elem.topic(),
395 elem.partition(),
396 elem.offset(),
397 elem.metadata(),
398 elem.error(),
399 )?;
400 }
401 write!(f, "}}")
402 }
403}
404
405unsafe impl Send for TopicPartitionList {}
406unsafe impl Sync for TopicPartitionList {}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411
412 use std::collections::HashMap;
413
414 #[test]
415 fn offset_conversion() {
416 assert_eq!(Offset::Offset(123).to_raw(), Some(123));
417 assert_eq!(Offset::from_raw(123), Offset::Offset(123));
418
419 assert_eq!(Offset::OffsetTail(10).to_raw(), Some(-2010));
420 assert_eq!(Offset::from_raw(-2010), Offset::OffsetTail(10));
421 }
422
423 #[test]
424 fn add_partition_offset_find() {
425 let mut tpl = TopicPartitionList::new();
426
427 tpl.add_partition("topic1", 0);
428 tpl.add_partition("topic1", 1);
429 tpl.add_partition("topic2", 0);
430 tpl.add_partition("topic2", 1);
431
432 tpl.set_partition_offset("topic1", 0, Offset::Offset(0))
433 .unwrap();
434 tpl.set_partition_offset("topic1", 1, Offset::Offset(1))
435 .unwrap();
436 tpl.set_partition_offset("topic2", 0, Offset::Offset(2))
437 .unwrap();
438 tpl.set_partition_offset("topic2", 1, Offset::Offset(3))
439 .unwrap();
440
441 assert_eq!(tpl.count(), 4);
442 assert!(tpl
443 .set_partition_offset("topic0", 3, Offset::Offset(0))
444 .is_err());
445 assert!(tpl
446 .set_partition_offset("topic3", 0, Offset::Offset(0))
447 .is_err());
448
449 let tp0 = tpl.find_partition("topic1", 0).unwrap();
450 let tp1 = tpl.find_partition("topic1", 1).unwrap();
451 let tp2 = tpl.find_partition("topic2", 0).unwrap();
452 let mut tp3 = tpl.find_partition("topic2", 1).unwrap();
453
454 assert_eq!(tp0.topic(), "topic1");
455 assert_eq!(tp0.partition(), 0);
456 assert_eq!(tp0.offset(), Offset::Offset(0));
457 assert_eq!(tp1.topic(), "topic1");
458 assert_eq!(tp1.partition(), 1);
459 assert_eq!(tp1.offset(), Offset::Offset(1));
460 assert_eq!(tp2.topic(), "topic2");
461 assert_eq!(tp2.partition(), 0);
462 assert_eq!(tp2.offset(), Offset::Offset(2));
463 assert_eq!(tp3.topic(), "topic2");
464 assert_eq!(tp3.partition(), 1);
465 assert_eq!(tp3.offset(), Offset::Offset(3));
466
467 tp3.set_offset(Offset::Offset(1234)).unwrap();
468 assert_eq!(tp3.offset(), Offset::Offset(1234));
469 }
470
471 #[test]
472 fn add_partition_range() {
473 let mut tpl = TopicPartitionList::new();
474
475 tpl.add_partition_range("topic1", 0, 3);
476
477 tpl.set_partition_offset("topic1", 0, Offset::Offset(0))
478 .unwrap();
479 tpl.set_partition_offset("topic1", 1, Offset::Offset(1))
480 .unwrap();
481 tpl.set_partition_offset("topic1", 2, Offset::Offset(2))
482 .unwrap();
483 tpl.set_partition_offset("topic1", 3, Offset::Offset(3))
484 .unwrap();
485 assert!(tpl
486 .set_partition_offset("topic1", 4, Offset::Offset(2))
487 .is_err());
488 }
489
490 #[test]
491 fn check_defaults() {
492 let mut tpl = TopicPartitionList::new();
493
494 tpl.add_partition("topic1", 0);
495
496 let tp = tpl.find_partition("topic1", 0).unwrap();
497 assert_eq!(tp.offset(), Offset::Invalid);
498 }
499
500 #[test]
501 fn test_add_partition_offset_clone() {
502 let mut tpl = TopicPartitionList::new();
503 tpl.add_partition_offset("topic1", 0, Offset::Offset(0))
504 .unwrap();
505 tpl.add_partition_offset("topic1", 1, Offset::Offset(1))
506 .unwrap();
507
508 let tp0 = tpl.find_partition("topic1", 0).unwrap();
509 let tp1 = tpl.find_partition("topic1", 1).unwrap();
510 assert_eq!(tp0.topic(), "topic1");
511 assert_eq!(tp0.partition(), 0);
512 assert_eq!(tp0.offset(), Offset::Offset(0));
513 assert_eq!(tp1.topic(), "topic1");
514 assert_eq!(tp1.partition(), 1);
515 assert_eq!(tp1.offset(), Offset::Offset(1));
516
517 let tpl_cloned = tpl.clone();
518 let tp0 = tpl_cloned.find_partition("topic1", 0).unwrap();
519 let tp1 = tpl_cloned.find_partition("topic1", 1).unwrap();
520 assert_eq!(tp0.topic(), "topic1");
521 assert_eq!(tp0.partition(), 0);
522 assert_eq!(tp0.offset(), Offset::Offset(0));
523 assert_eq!(tp1.topic(), "topic1");
524 assert_eq!(tp1.partition(), 1);
525 assert_eq!(tp1.offset(), Offset::Offset(1));
526 }
527
528 #[test]
529 fn test_topic_map() {
530 let mut topic_map = HashMap::new();
531 topic_map.insert(("topic1".to_string(), 0), Offset::Invalid);
532 topic_map.insert(("topic1".to_string(), 1), Offset::Offset(123));
533 topic_map.insert(("topic2".to_string(), 0), Offset::Beginning);
534
535 let tpl = TopicPartitionList::from_topic_map(&topic_map).unwrap();
536 let topic_map2 = tpl.to_topic_map();
537 let tpl2 = TopicPartitionList::from_topic_map(&topic_map2).unwrap();
538
539 assert_eq!(topic_map, topic_map2);
540 assert_eq!(tpl, tpl2);
541 }
542}