madsim_rdkafka/std/
topic_partition_list.rs

1//! Data structures representing topic, partitions and offsets.
2//!
3//! Compatible with the `RDKafkaTopicPartitionList` exported by `rdkafka-sys`.
4
5use std::collections::HashMap;
6use std::ffi::{CStr, CString};
7use std::fmt;
8use std::slice;
9use std::str;
10
11use libc::c_void;
12use rdkafka_sys as rdsys;
13use rdkafka_sys::types::*;
14
15use crate::error::{IsError, KafkaError, KafkaResult};
16use crate::util::{self, KafkaDrop, NativePtr};
17
18const PARTITION_UNASSIGNED: i32 = -1;
19
20const OFFSET_BEGINNING: i64 = rdsys::RD_KAFKA_OFFSET_BEGINNING as i64;
21const OFFSET_END: i64 = rdsys::RD_KAFKA_OFFSET_END as i64;
22const OFFSET_STORED: i64 = rdsys::RD_KAFKA_OFFSET_STORED as i64;
23const OFFSET_INVALID: i64 = rdsys::RD_KAFKA_OFFSET_INVALID as i64;
24const OFFSET_TAIL_BASE: i64 = rdsys::RD_KAFKA_OFFSET_TAIL_BASE as i64;
25
26/// A Kafka offset.
27#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Offset {
29    /// Start consuming from the beginning of the partition.
30    Beginning,
31    /// Start consuming from the end of the partition.
32    End,
33    /// Start consuming from the stored offset.
34    Stored,
35    /// Offset not assigned or invalid.
36    Invalid,
37    /// A specific offset to consume from.
38    ///
39    /// Note that while the offset is a signed integer, negative offsets will be
40    /// rejected when passed to librdkafka.
41    Offset(i64),
42    /// An offset relative to the end of the partition.
43    ///
44    /// Note that while the offset is a signed integer, negative offsets will
45    /// be rejected when passed to librdkafka.
46    OffsetTail(i64),
47}
48
49impl Offset {
50    /// Converts the integer representation of an offset used by librdkafka to
51    /// an `Offset`.
52    pub fn from_raw(raw_offset: i64) -> Offset {
53        match raw_offset {
54            OFFSET_BEGINNING => Offset::Beginning,
55            OFFSET_END => Offset::End,
56            OFFSET_STORED => Offset::Stored,
57            OFFSET_INVALID => Offset::Invalid,
58            n if n <= OFFSET_TAIL_BASE => Offset::OffsetTail(-(n - OFFSET_TAIL_BASE)),
59            n => Offset::Offset(n),
60        }
61    }
62
63    /// Converts the `Offset` to the internal integer representation used by
64    /// librdkafka.
65    ///
66    /// Returns `None` if the offset cannot be represented in librdkafka's
67    /// internal representation.
68    pub fn to_raw(self) -> Option<i64> {
69        match self {
70            Offset::Beginning => Some(OFFSET_BEGINNING),
71            Offset::End => Some(OFFSET_END),
72            Offset::Stored => Some(OFFSET_STORED),
73            Offset::Invalid => Some(OFFSET_INVALID),
74            Offset::Offset(n) if n >= 0 => Some(n),
75            Offset::OffsetTail(n) if n > 0 => Some(OFFSET_TAIL_BASE - n),
76            Offset::Offset(_) | Offset::OffsetTail(_) => None,
77        }
78    }
79}
80
81// TODO: implement Debug
82/// One element of the topic partition list.
83pub struct TopicPartitionListElem<'a> {
84    ptr: &'a mut RDKafkaTopicPartition,
85}
86
87unsafe impl Send for TopicPartitionListElem<'_> {}
88unsafe impl Sync for TopicPartitionListElem<'_> {}
89
90impl<'a> TopicPartitionListElem<'a> {
91    // _owner_list serves as a marker so that the lifetime isn't too long
92    fn from_ptr(
93        _owner_list: &'a TopicPartitionList,
94        ptr: &'a mut RDKafkaTopicPartition,
95    ) -> TopicPartitionListElem<'a> {
96        TopicPartitionListElem { ptr }
97    }
98
99    /// Returns the topic name.
100    pub fn topic(&self) -> &str {
101        unsafe {
102            let c_str = self.ptr.topic;
103            CStr::from_ptr(c_str)
104                .to_str()
105                .expect("Topic name is not UTF-8")
106        }
107    }
108
109    /// Returns the optional error associated to the specific entry in the TPL.
110    pub fn error(&self) -> KafkaResult<()> {
111        let kafka_err = self.ptr.err;
112        if kafka_err.is_error() {
113            Err(KafkaError::OffsetFetch(kafka_err.into()))
114        } else {
115            Ok(())
116        }
117    }
118
119    /// Returns the partition number.
120    pub fn partition(&self) -> i32 {
121        self.ptr.partition
122    }
123
124    /// Returns the offset.
125    pub fn offset(&self) -> Offset {
126        let raw_offset = self.ptr.offset;
127        Offset::from_raw(raw_offset)
128    }
129
130    /// Sets the offset.
131    pub fn set_offset(&mut self, offset: Offset) -> KafkaResult<()> {
132        match offset.to_raw() {
133            Some(offset) => {
134                self.ptr.offset = offset;
135                Ok(())
136            }
137            None => Err(KafkaError::SetPartitionOffset(
138                RDKafkaErrorCode::InvalidArgument,
139            )),
140        }
141    }
142
143    /// Returns the optional metadata associated with the entry.
144    pub fn metadata(&self) -> &str {
145        let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) };
146        str::from_utf8(bytes).expect("Metadata is not UTF-8")
147    }
148
149    /// Sets the optional metadata associated with the entry.
150    pub fn set_metadata<M>(&mut self, metadata: M)
151    where
152        M: AsRef<str>,
153    {
154        let metadata = metadata.as_ref();
155        let buf = unsafe { libc::malloc(metadata.len()) };
156        unsafe { libc::memcpy(buf, metadata.as_ptr() as *const c_void, metadata.len()) };
157        self.ptr.metadata = buf;
158        self.ptr.metadata_size = metadata.len();
159    }
160}
161
162impl<'a> PartialEq for TopicPartitionListElem<'a> {
163    fn eq(&self, other: &TopicPartitionListElem<'a>) -> bool {
164        self.topic() == other.topic()
165            && self.partition() == other.partition()
166            && self.offset() == other.offset()
167            && self.metadata() == other.metadata()
168    }
169}
170
171/// A structure to store and manipulate a list of topics and partitions with optional offsets.
172pub struct TopicPartitionList {
173    ptr: NativePtr<RDKafkaTopicPartitionList>,
174}
175
176unsafe impl KafkaDrop for RDKafkaTopicPartitionList {
177    const TYPE: &'static str = "topic partition list";
178    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_topic_partition_list_destroy;
179}
180
181impl Clone for TopicPartitionList {
182    fn clone(&self) -> Self {
183        let new_tpl = unsafe { rdsys::rd_kafka_topic_partition_list_copy(self.ptr()) };
184        unsafe { TopicPartitionList::from_ptr(new_tpl) }
185    }
186}
187
188impl TopicPartitionList {
189    /// Creates a new empty list with default capacity.
190    pub fn new() -> TopicPartitionList {
191        TopicPartitionList::with_capacity(5)
192    }
193
194    /// Creates a new empty list with the specified capacity.
195    pub fn with_capacity(capacity: usize) -> TopicPartitionList {
196        let ptr = unsafe { rdsys::rd_kafka_topic_partition_list_new(capacity as i32) };
197        unsafe { TopicPartitionList::from_ptr(ptr) }
198    }
199
200    /// Transforms a pointer to the native librdkafka RDTopicPartitionList into a
201    /// managed `TopicPartitionList` instance.
202    pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaTopicPartitionList) -> TopicPartitionList {
203        TopicPartitionList {
204            ptr: NativePtr::from_ptr(ptr).unwrap(),
205        }
206    }
207
208    /// Given a topic map, generates a new `TopicPartitionList`.
209    pub fn from_topic_map(
210        topic_map: &HashMap<(String, i32), Offset>,
211    ) -> KafkaResult<TopicPartitionList> {
212        let mut tpl = TopicPartitionList::with_capacity(topic_map.len());
213        for ((topic_name, partition), offset) in topic_map {
214            tpl.add_partition_offset(topic_name, *partition, *offset)?;
215        }
216        Ok(tpl)
217    }
218
219    /// Returns the pointer to the internal librdkafka structure.
220    pub fn ptr(&self) -> *mut RDKafkaTopicPartitionList {
221        self.ptr.ptr()
222    }
223
224    /// Returns the number of elements in the list.
225    pub fn count(&self) -> usize {
226        self.ptr.cnt as usize
227    }
228
229    /// Returns the capacity of the list.
230    pub fn capacity(&self) -> usize {
231        self.ptr.size as usize
232    }
233
234    /// Adds a topic with unassigned partitions to the list.
235    pub fn add_topic_unassigned<'a>(&'a mut self, topic: &str) -> TopicPartitionListElem<'a> {
236        self.add_partition(topic, PARTITION_UNASSIGNED)
237    }
238
239    /// Adds a topic and partition to the list.
240    pub fn add_partition<'a>(
241        &'a mut self,
242        topic: &str,
243        partition: i32,
244    ) -> TopicPartitionListElem<'a> {
245        let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
246        let tp_ptr = unsafe {
247            rdsys::rd_kafka_topic_partition_list_add(self.ptr(), topic_c.as_ptr(), partition)
248        };
249        unsafe { TopicPartitionListElem::from_ptr(self, &mut *tp_ptr) }
250    }
251
252    /// Adds a topic and partition range to the list.
253    pub fn add_partition_range(&mut self, topic: &str, start_partition: i32, stop_partition: i32) {
254        let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
255        unsafe {
256            rdsys::rd_kafka_topic_partition_list_add_range(
257                self.ptr(),
258                topic_c.as_ptr(),
259                start_partition,
260                stop_partition,
261            );
262        }
263    }
264
265    /// Sets the offset for an already created topic partition. It will fail if the topic partition
266    /// isn't in the list.
267    pub fn set_partition_offset(
268        &mut self,
269        topic: &str,
270        partition: i32,
271        offset: Offset,
272    ) -> KafkaResult<()> {
273        let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
274        let kafka_err = match offset.to_raw() {
275            Some(offset) => unsafe {
276                rdsys::rd_kafka_topic_partition_list_set_offset(
277                    self.ptr(),
278                    topic_c.as_ptr(),
279                    partition,
280                    offset,
281                )
282            },
283            None => RDKafkaRespErr::RD_KAFKA_RESP_ERR__INVALID_ARG,
284        };
285
286        if kafka_err.is_error() {
287            Err(KafkaError::SetPartitionOffset(kafka_err.into()))
288        } else {
289            Ok(())
290        }
291    }
292
293    /// Adds a topic and partition to the list, with the specified offset.
294    pub fn add_partition_offset(
295        &mut self,
296        topic: &str,
297        partition: i32,
298        offset: Offset,
299    ) -> KafkaResult<()> {
300        self.add_partition(topic, partition);
301        self.set_partition_offset(topic, partition, offset)
302    }
303
304    /// Given a topic name and a partition number, returns the corresponding list element.
305    pub fn find_partition(
306        &self,
307        topic: &str,
308        partition: i32,
309    ) -> Option<TopicPartitionListElem<'_>> {
310        let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
311        let elem_ptr = unsafe {
312            rdsys::rd_kafka_topic_partition_list_find(self.ptr(), topic_c.as_ptr(), partition)
313        };
314        if elem_ptr.is_null() {
315            None
316        } else {
317            Some(unsafe { TopicPartitionListElem::from_ptr(self, &mut *elem_ptr) })
318        }
319    }
320
321    /// Sets all partitions in the list to the specified offset.
322    pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
323        let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
324        for elem_ptr in slice {
325            let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
326            elem.set_offset(offset)?;
327        }
328        Ok(())
329    }
330
331    /// Returns all the elements of the list.
332    pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
333        let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
334        let mut vec = Vec::with_capacity(slice.len());
335        for elem_ptr in slice {
336            vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
337        }
338        vec
339    }
340
341    /// Returns all the elements of the list that belong to the specified topic.
342    pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
343        let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
344        let mut vec = Vec::with_capacity(slice.len());
345        for elem_ptr in slice {
346            let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
347            if tp.topic() == topic {
348                vec.push(tp);
349            }
350        }
351        vec
352    }
353
354    /// Returns a hashmap-based representation of the list.
355    pub fn to_topic_map(&self) -> HashMap<(String, i32), Offset> {
356        self.elements()
357            .iter()
358            .map(|elem| ((elem.topic().to_owned(), elem.partition()), elem.offset()))
359            .collect()
360    }
361}
362
363impl PartialEq for TopicPartitionList {
364    fn eq(&self, other: &TopicPartitionList) -> bool {
365        if self.count() != other.count() {
366            return false;
367        }
368        self.elements().iter().all(|elem| {
369            if let Some(other_elem) = other.find_partition(elem.topic(), elem.partition()) {
370                elem == &other_elem
371            } else {
372                false
373            }
374        })
375    }
376}
377
378impl Default for TopicPartitionList {
379    fn default() -> Self {
380        Self::new()
381    }
382}
383
384impl fmt::Debug for TopicPartitionList {
385    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
386        write!(f, "TPL {{")?;
387        for (i, elem) in self.elements().iter().enumerate() {
388            if i > 0 {
389                write!(f, "; ")?;
390            }
391            write!(
392                f,
393                "{}/{}: offset={:?} metadata={:?}, error={:?}",
394                elem.topic(),
395                elem.partition(),
396                elem.offset(),
397                elem.metadata(),
398                elem.error(),
399            )?;
400        }
401        write!(f, "}}")
402    }
403}
404
405unsafe impl Send for TopicPartitionList {}
406unsafe impl Sync for TopicPartitionList {}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411
412    use std::collections::HashMap;
413
414    #[test]
415    fn offset_conversion() {
416        assert_eq!(Offset::Offset(123).to_raw(), Some(123));
417        assert_eq!(Offset::from_raw(123), Offset::Offset(123));
418
419        assert_eq!(Offset::OffsetTail(10).to_raw(), Some(-2010));
420        assert_eq!(Offset::from_raw(-2010), Offset::OffsetTail(10));
421    }
422
423    #[test]
424    fn add_partition_offset_find() {
425        let mut tpl = TopicPartitionList::new();
426
427        tpl.add_partition("topic1", 0);
428        tpl.add_partition("topic1", 1);
429        tpl.add_partition("topic2", 0);
430        tpl.add_partition("topic2", 1);
431
432        tpl.set_partition_offset("topic1", 0, Offset::Offset(0))
433            .unwrap();
434        tpl.set_partition_offset("topic1", 1, Offset::Offset(1))
435            .unwrap();
436        tpl.set_partition_offset("topic2", 0, Offset::Offset(2))
437            .unwrap();
438        tpl.set_partition_offset("topic2", 1, Offset::Offset(3))
439            .unwrap();
440
441        assert_eq!(tpl.count(), 4);
442        assert!(tpl
443            .set_partition_offset("topic0", 3, Offset::Offset(0))
444            .is_err());
445        assert!(tpl
446            .set_partition_offset("topic3", 0, Offset::Offset(0))
447            .is_err());
448
449        let tp0 = tpl.find_partition("topic1", 0).unwrap();
450        let tp1 = tpl.find_partition("topic1", 1).unwrap();
451        let tp2 = tpl.find_partition("topic2", 0).unwrap();
452        let mut tp3 = tpl.find_partition("topic2", 1).unwrap();
453
454        assert_eq!(tp0.topic(), "topic1");
455        assert_eq!(tp0.partition(), 0);
456        assert_eq!(tp0.offset(), Offset::Offset(0));
457        assert_eq!(tp1.topic(), "topic1");
458        assert_eq!(tp1.partition(), 1);
459        assert_eq!(tp1.offset(), Offset::Offset(1));
460        assert_eq!(tp2.topic(), "topic2");
461        assert_eq!(tp2.partition(), 0);
462        assert_eq!(tp2.offset(), Offset::Offset(2));
463        assert_eq!(tp3.topic(), "topic2");
464        assert_eq!(tp3.partition(), 1);
465        assert_eq!(tp3.offset(), Offset::Offset(3));
466
467        tp3.set_offset(Offset::Offset(1234)).unwrap();
468        assert_eq!(tp3.offset(), Offset::Offset(1234));
469    }
470
471    #[test]
472    fn add_partition_range() {
473        let mut tpl = TopicPartitionList::new();
474
475        tpl.add_partition_range("topic1", 0, 3);
476
477        tpl.set_partition_offset("topic1", 0, Offset::Offset(0))
478            .unwrap();
479        tpl.set_partition_offset("topic1", 1, Offset::Offset(1))
480            .unwrap();
481        tpl.set_partition_offset("topic1", 2, Offset::Offset(2))
482            .unwrap();
483        tpl.set_partition_offset("topic1", 3, Offset::Offset(3))
484            .unwrap();
485        assert!(tpl
486            .set_partition_offset("topic1", 4, Offset::Offset(2))
487            .is_err());
488    }
489
490    #[test]
491    fn check_defaults() {
492        let mut tpl = TopicPartitionList::new();
493
494        tpl.add_partition("topic1", 0);
495
496        let tp = tpl.find_partition("topic1", 0).unwrap();
497        assert_eq!(tp.offset(), Offset::Invalid);
498    }
499
500    #[test]
501    fn test_add_partition_offset_clone() {
502        let mut tpl = TopicPartitionList::new();
503        tpl.add_partition_offset("topic1", 0, Offset::Offset(0))
504            .unwrap();
505        tpl.add_partition_offset("topic1", 1, Offset::Offset(1))
506            .unwrap();
507
508        let tp0 = tpl.find_partition("topic1", 0).unwrap();
509        let tp1 = tpl.find_partition("topic1", 1).unwrap();
510        assert_eq!(tp0.topic(), "topic1");
511        assert_eq!(tp0.partition(), 0);
512        assert_eq!(tp0.offset(), Offset::Offset(0));
513        assert_eq!(tp1.topic(), "topic1");
514        assert_eq!(tp1.partition(), 1);
515        assert_eq!(tp1.offset(), Offset::Offset(1));
516
517        let tpl_cloned = tpl.clone();
518        let tp0 = tpl_cloned.find_partition("topic1", 0).unwrap();
519        let tp1 = tpl_cloned.find_partition("topic1", 1).unwrap();
520        assert_eq!(tp0.topic(), "topic1");
521        assert_eq!(tp0.partition(), 0);
522        assert_eq!(tp0.offset(), Offset::Offset(0));
523        assert_eq!(tp1.topic(), "topic1");
524        assert_eq!(tp1.partition(), 1);
525        assert_eq!(tp1.offset(), Offset::Offset(1));
526    }
527
528    #[test]
529    fn test_topic_map() {
530        let mut topic_map = HashMap::new();
531        topic_map.insert(("topic1".to_string(), 0), Offset::Invalid);
532        topic_map.insert(("topic1".to_string(), 1), Offset::Offset(123));
533        topic_map.insert(("topic2".to_string(), 0), Offset::Beginning);
534
535        let tpl = TopicPartitionList::from_topic_map(&topic_map).unwrap();
536        let topic_map2 = tpl.to_topic_map();
537        let tpl2 = TopicPartitionList::from_topic_map(&topic_map2).unwrap();
538
539        assert_eq!(topic_map, topic_map2);
540        assert_eq!(tpl, tpl2);
541    }
542}