use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::fmt;
use std::slice;
use std::str;
use libc::c_void;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::util::{self, KafkaDrop, NativePtr};
const PARTITION_UNASSIGNED: i32 = -1;
const OFFSET_BEGINNING: i64 = rdsys::RD_KAFKA_OFFSET_BEGINNING as i64;
const OFFSET_END: i64 = rdsys::RD_KAFKA_OFFSET_END as i64;
const OFFSET_STORED: i64 = rdsys::RD_KAFKA_OFFSET_STORED as i64;
const OFFSET_INVALID: i64 = rdsys::RD_KAFKA_OFFSET_INVALID as i64;
const OFFSET_TAIL_BASE: i64 = rdsys::RD_KAFKA_OFFSET_TAIL_BASE as i64;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Offset {
Beginning,
End,
Stored,
Invalid,
Offset(i64),
OffsetTail(i64),
}
impl Offset {
pub fn from_raw(raw_offset: i64) -> Offset {
match raw_offset {
OFFSET_BEGINNING => Offset::Beginning,
OFFSET_END => Offset::End,
OFFSET_STORED => Offset::Stored,
OFFSET_INVALID => Offset::Invalid,
n if n <= OFFSET_TAIL_BASE => Offset::OffsetTail(-(n - OFFSET_TAIL_BASE)),
n => Offset::Offset(n),
}
}
pub fn to_raw(self) -> Option<i64> {
match self {
Offset::Beginning => Some(OFFSET_BEGINNING),
Offset::End => Some(OFFSET_END),
Offset::Stored => Some(OFFSET_STORED),
Offset::Invalid => Some(OFFSET_INVALID),
Offset::Offset(n) if n >= 0 => Some(n),
Offset::OffsetTail(n) if n > 0 => Some(OFFSET_TAIL_BASE - n),
Offset::Offset(_) | Offset::OffsetTail(_) => None,
}
}
}
pub struct TopicPartitionListElem<'a> {
ptr: &'a mut RDKafkaTopicPartition,
}
unsafe impl Send for TopicPartitionListElem<'_> {}
unsafe impl Sync for TopicPartitionListElem<'_> {}
impl<'a> TopicPartitionListElem<'a> {
fn from_ptr(
_owner_list: &'a TopicPartitionList,
ptr: &'a mut RDKafkaTopicPartition,
) -> TopicPartitionListElem<'a> {
TopicPartitionListElem { ptr }
}
pub fn topic(&self) -> &str {
unsafe {
let c_str = self.ptr.topic;
CStr::from_ptr(c_str)
.to_str()
.expect("Topic name is not UTF-8")
}
}
pub fn error(&self) -> KafkaResult<()> {
let kafka_err = self.ptr.err;
if kafka_err.is_error() {
Err(KafkaError::OffsetFetch(kafka_err.into()))
} else {
Ok(())
}
}
pub fn partition(&self) -> i32 {
self.ptr.partition
}
pub fn offset(&self) -> Offset {
let raw_offset = self.ptr.offset;
Offset::from_raw(raw_offset)
}
pub fn set_offset(&mut self, offset: Offset) -> KafkaResult<()> {
match offset.to_raw() {
Some(offset) => {
self.ptr.offset = offset;
Ok(())
}
None => Err(KafkaError::SetPartitionOffset(
RDKafkaErrorCode::InvalidArgument,
)),
}
}
pub fn metadata(&self) -> &str {
let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) };
str::from_utf8(bytes).expect("Metadata is not UTF-8")
}
pub fn set_metadata<M>(&mut self, metadata: M)
where
M: AsRef<str>,
{
let metadata = metadata.as_ref();
let buf = unsafe { libc::malloc(metadata.len()) };
unsafe { libc::memcpy(buf, metadata.as_ptr() as *const c_void, metadata.len()) };
self.ptr.metadata = buf;
self.ptr.metadata_size = metadata.len();
}
}
impl<'a> PartialEq for TopicPartitionListElem<'a> {
fn eq(&self, other: &TopicPartitionListElem<'a>) -> bool {
self.topic() == other.topic()
&& self.partition() == other.partition()
&& self.offset() == other.offset()
&& self.metadata() == other.metadata()
}
}
pub struct TopicPartitionList {
ptr: NativePtr<RDKafkaTopicPartitionList>,
}
unsafe impl KafkaDrop for RDKafkaTopicPartitionList {
const TYPE: &'static str = "topic partition list";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_topic_partition_list_destroy;
}
impl Clone for TopicPartitionList {
fn clone(&self) -> Self {
let new_tpl = unsafe { rdsys::rd_kafka_topic_partition_list_copy(self.ptr()) };
unsafe { TopicPartitionList::from_ptr(new_tpl) }
}
}
impl TopicPartitionList {
pub fn new() -> TopicPartitionList {
TopicPartitionList::with_capacity(5)
}
pub fn with_capacity(capacity: usize) -> TopicPartitionList {
let ptr = unsafe { rdsys::rd_kafka_topic_partition_list_new(capacity as i32) };
unsafe { TopicPartitionList::from_ptr(ptr) }
}
pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaTopicPartitionList) -> TopicPartitionList {
TopicPartitionList {
ptr: NativePtr::from_ptr(ptr).unwrap(),
}
}
pub fn from_topic_map(
topic_map: &HashMap<(String, i32), Offset>,
) -> KafkaResult<TopicPartitionList> {
let mut tpl = TopicPartitionList::with_capacity(topic_map.len());
for ((topic_name, partition), offset) in topic_map {
tpl.add_partition_offset(topic_name, *partition, *offset)?;
}
Ok(tpl)
}
pub fn ptr(&self) -> *mut RDKafkaTopicPartitionList {
self.ptr.ptr()
}
pub fn count(&self) -> usize {
self.ptr.cnt as usize
}
pub fn capacity(&self) -> usize {
self.ptr.size as usize
}
pub fn add_topic_unassigned<'a>(&'a mut self, topic: &str) -> TopicPartitionListElem<'a> {
self.add_partition(topic, PARTITION_UNASSIGNED)
}
pub fn add_partition<'a>(
&'a mut self,
topic: &str,
partition: i32,
) -> TopicPartitionListElem<'a> {
let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
let tp_ptr = unsafe {
rdsys::rd_kafka_topic_partition_list_add(self.ptr(), topic_c.as_ptr(), partition)
};
unsafe { TopicPartitionListElem::from_ptr(self, &mut *tp_ptr) }
}
pub fn add_partition_range(&mut self, topic: &str, start_partition: i32, stop_partition: i32) {
let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
unsafe {
rdsys::rd_kafka_topic_partition_list_add_range(
self.ptr(),
topic_c.as_ptr(),
start_partition,
stop_partition,
);
}
}
pub fn set_partition_offset(
&mut self,
topic: &str,
partition: i32,
offset: Offset,
) -> KafkaResult<()> {
let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
let kafka_err = match offset.to_raw() {
Some(offset) => unsafe {
rdsys::rd_kafka_topic_partition_list_set_offset(
self.ptr(),
topic_c.as_ptr(),
partition,
offset,
)
},
None => RDKafkaRespErr::RD_KAFKA_RESP_ERR__INVALID_ARG,
};
if kafka_err.is_error() {
Err(KafkaError::SetPartitionOffset(kafka_err.into()))
} else {
Ok(())
}
}
pub fn add_partition_offset(
&mut self,
topic: &str,
partition: i32,
offset: Offset,
) -> KafkaResult<()> {
self.add_partition(topic, partition);
self.set_partition_offset(topic, partition, offset)
}
pub fn find_partition(
&self,
topic: &str,
partition: i32,
) -> Option<TopicPartitionListElem<'_>> {
let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
let elem_ptr = unsafe {
rdsys::rd_kafka_topic_partition_list_find(self.ptr(), topic_c.as_ptr(), partition)
};
if elem_ptr.is_null() {
None
} else {
Some(unsafe { TopicPartitionListElem::from_ptr(self, &mut *elem_ptr) })
}
}
pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
for elem_ptr in slice {
let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
elem.set_offset(offset)?;
}
Ok(())
}
pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
}
vec
}
pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
let mut vec = Vec::with_capacity(slice.len());
for elem_ptr in slice {
let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
if tp.topic() == topic {
vec.push(tp);
}
}
vec
}
pub fn to_topic_map(&self) -> HashMap<(String, i32), Offset> {
self.elements()
.iter()
.map(|elem| ((elem.topic().to_owned(), elem.partition()), elem.offset()))
.collect()
}
}
impl PartialEq for TopicPartitionList {
fn eq(&self, other: &TopicPartitionList) -> bool {
if self.count() != other.count() {
return false;
}
self.elements().iter().all(|elem| {
if let Some(other_elem) = other.find_partition(elem.topic(), elem.partition()) {
elem == &other_elem
} else {
false
}
})
}
}
impl Default for TopicPartitionList {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for TopicPartitionList {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TPL {{")?;
for (i, elem) in self.elements().iter().enumerate() {
if i > 0 {
write!(f, "; ")?;
}
write!(
f,
"{}/{}: offset={:?} metadata={:?}, error={:?}",
elem.topic(),
elem.partition(),
elem.offset(),
elem.metadata(),
elem.error(),
)?;
}
write!(f, "}}")
}
}
unsafe impl Send for TopicPartitionList {}
unsafe impl Sync for TopicPartitionList {}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[test]
fn offset_conversion() {
assert_eq!(Offset::Offset(123).to_raw(), Some(123));
assert_eq!(Offset::from_raw(123), Offset::Offset(123));
assert_eq!(Offset::OffsetTail(10).to_raw(), Some(-2010));
assert_eq!(Offset::from_raw(-2010), Offset::OffsetTail(10));
}
#[test]
fn add_partition_offset_find() {
let mut tpl = TopicPartitionList::new();
tpl.add_partition("topic1", 0);
tpl.add_partition("topic1", 1);
tpl.add_partition("topic2", 0);
tpl.add_partition("topic2", 1);
tpl.set_partition_offset("topic1", 0, Offset::Offset(0))
.unwrap();
tpl.set_partition_offset("topic1", 1, Offset::Offset(1))
.unwrap();
tpl.set_partition_offset("topic2", 0, Offset::Offset(2))
.unwrap();
tpl.set_partition_offset("topic2", 1, Offset::Offset(3))
.unwrap();
assert_eq!(tpl.count(), 4);
assert!(tpl
.set_partition_offset("topic0", 3, Offset::Offset(0))
.is_err());
assert!(tpl
.set_partition_offset("topic3", 0, Offset::Offset(0))
.is_err());
let tp0 = tpl.find_partition("topic1", 0).unwrap();
let tp1 = tpl.find_partition("topic1", 1).unwrap();
let tp2 = tpl.find_partition("topic2", 0).unwrap();
let mut tp3 = tpl.find_partition("topic2", 1).unwrap();
assert_eq!(tp0.topic(), "topic1");
assert_eq!(tp0.partition(), 0);
assert_eq!(tp0.offset(), Offset::Offset(0));
assert_eq!(tp1.topic(), "topic1");
assert_eq!(tp1.partition(), 1);
assert_eq!(tp1.offset(), Offset::Offset(1));
assert_eq!(tp2.topic(), "topic2");
assert_eq!(tp2.partition(), 0);
assert_eq!(tp2.offset(), Offset::Offset(2));
assert_eq!(tp3.topic(), "topic2");
assert_eq!(tp3.partition(), 1);
assert_eq!(tp3.offset(), Offset::Offset(3));
tp3.set_offset(Offset::Offset(1234)).unwrap();
assert_eq!(tp3.offset(), Offset::Offset(1234));
}
#[test]
fn add_partition_range() {
let mut tpl = TopicPartitionList::new();
tpl.add_partition_range("topic1", 0, 3);
tpl.set_partition_offset("topic1", 0, Offset::Offset(0))
.unwrap();
tpl.set_partition_offset("topic1", 1, Offset::Offset(1))
.unwrap();
tpl.set_partition_offset("topic1", 2, Offset::Offset(2))
.unwrap();
tpl.set_partition_offset("topic1", 3, Offset::Offset(3))
.unwrap();
assert!(tpl
.set_partition_offset("topic1", 4, Offset::Offset(2))
.is_err());
}
#[test]
fn check_defaults() {
let mut tpl = TopicPartitionList::new();
tpl.add_partition("topic1", 0);
let tp = tpl.find_partition("topic1", 0).unwrap();
assert_eq!(tp.offset(), Offset::Invalid);
}
#[test]
fn test_add_partition_offset_clone() {
let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset("topic1", 0, Offset::Offset(0))
.unwrap();
tpl.add_partition_offset("topic1", 1, Offset::Offset(1))
.unwrap();
let tp0 = tpl.find_partition("topic1", 0).unwrap();
let tp1 = tpl.find_partition("topic1", 1).unwrap();
assert_eq!(tp0.topic(), "topic1");
assert_eq!(tp0.partition(), 0);
assert_eq!(tp0.offset(), Offset::Offset(0));
assert_eq!(tp1.topic(), "topic1");
assert_eq!(tp1.partition(), 1);
assert_eq!(tp1.offset(), Offset::Offset(1));
let tpl_cloned = tpl.clone();
let tp0 = tpl_cloned.find_partition("topic1", 0).unwrap();
let tp1 = tpl_cloned.find_partition("topic1", 1).unwrap();
assert_eq!(tp0.topic(), "topic1");
assert_eq!(tp0.partition(), 0);
assert_eq!(tp0.offset(), Offset::Offset(0));
assert_eq!(tp1.topic(), "topic1");
assert_eq!(tp1.partition(), 1);
assert_eq!(tp1.offset(), Offset::Offset(1));
}
#[test]
fn test_topic_map() {
let mut topic_map = HashMap::new();
topic_map.insert(("topic1".to_string(), 0), Offset::Invalid);
topic_map.insert(("topic1".to_string(), 1), Offset::Offset(123));
topic_map.insert(("topic2".to_string(), 0), Offset::Beginning);
let tpl = TopicPartitionList::from_topic_map(&topic_map).unwrap();
let topic_map2 = tpl.to_topic_map();
let tpl2 = TopicPartitionList::from_topic_map(&topic_map2).unwrap();
assert_eq!(topic_map, topic_map2);
assert_eq!(tpl, tpl2);
}
}