1use std::ffi::{CStr, CString};
4use std::fmt;
5use std::marker::PhantomData;
6use std::os::raw::c_void;
7use std::ptr;
8use std::str;
9use std::time::SystemTime;
10
11use rdkafka_sys as rdsys;
12use rdkafka_sys::types::*;
13
14use crate::error::{IsError, KafkaError, KafkaResult};
15use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr};
16
17#[derive(Debug, PartialEq, Eq, Clone, Copy)]
19pub enum Timestamp {
20 NotAvailable,
22 CreateTime(i64),
24 LogAppendTime(i64),
26}
27
28impl Timestamp {
29 pub fn to_millis(self) -> Option<i64> {
31 match self {
32 Timestamp::NotAvailable | Timestamp::CreateTime(-1) | Timestamp::LogAppendTime(-1) => {
33 None
34 }
35 Timestamp::CreateTime(t) | Timestamp::LogAppendTime(t) => Some(t),
36 }
37 }
38
39 pub fn now() -> Timestamp {
41 Timestamp::from(SystemTime::now())
42 }
43}
44
45impl From<i64> for Timestamp {
46 fn from(system_time: i64) -> Timestamp {
47 Timestamp::CreateTime(system_time)
48 }
49}
50
51impl From<SystemTime> for Timestamp {
52 fn from(system_time: SystemTime) -> Timestamp {
53 Timestamp::CreateTime(millis_to_epoch(system_time))
54 }
55}
56
57pub trait Headers {
71 fn count(&self) -> usize;
73
74 fn get(&self, idx: usize) -> Header<'_, &[u8]> {
79 self.try_get(idx).unwrap_or_else(|| {
80 panic!(
81 "headers index out of bounds: the count is {} but the index is {}",
82 self.count(),
83 idx,
84 )
85 })
86 }
87
88 fn get_as<V>(&self, idx: usize) -> Result<Header<'_, &V>, V::Error>
93 where
94 V: FromBytes + ?Sized,
95 {
96 self.try_get_as(idx).unwrap_or_else(|| {
97 panic!(
98 "headers index out of bounds: the count is {} but the index is {}",
99 self.count(),
100 idx,
101 )
102 })
103 }
104
105 fn try_get(&self, idx: usize) -> Option<Header<'_, &[u8]>>;
108
109 fn try_get_as<V>(&self, idx: usize) -> Option<Result<Header<'_, &V>, V::Error>>
112 where
113 V: FromBytes + ?Sized,
114 {
115 self.try_get(idx).map(|header| header.parse())
116 }
117
118 fn iter(&self) -> HeadersIter<'_, Self>
120 where
121 Self: Sized,
122 {
123 HeadersIter {
124 headers: self,
125 index: 0,
126 }
127 }
128}
129
130#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
132pub struct Header<'a, V> {
133 pub key: &'a str,
135 pub value: Option<V>,
137}
138
139impl<'a> Header<'a, &'a [u8]> {
140 fn parse<V>(&self) -> Result<Header<'a, &'a V>, V::Error>
141 where
142 V: FromBytes + ?Sized,
143 {
144 Ok(Header {
145 key: self.key,
146 value: self.value.map(V::from_bytes).transpose()?,
147 })
148 }
149}
150
151pub struct HeadersIter<'a, H> {
153 headers: &'a H,
154 index: usize,
155}
156
157impl<'a, H> Iterator for HeadersIter<'a, H>
158where
159 H: Headers,
160{
161 type Item = Header<'a, &'a [u8]>;
162
163 fn next(&mut self) -> Option<Header<'a, &'a [u8]>> {
164 if self.index < self.headers.count() {
165 let item = self.headers.get(self.index);
166 self.index += 1;
167 Some(item)
168 } else {
169 None
170 }
171 }
172}
173
174pub trait Message {
179 type Headers: Headers;
181
182 fn key(&self) -> Option<&[u8]>;
184
185 fn payload(&self) -> Option<&[u8]>;
187
188 unsafe fn payload_mut(&mut self) -> Option<&mut [u8]>;
197
198 fn topic(&self) -> &str;
200
201 fn partition(&self) -> i32;
203
204 fn offset(&self) -> i64;
206
207 fn timestamp(&self) -> Timestamp;
209
210 fn payload_view<P: ?Sized + FromBytes>(&self) -> Option<Result<&P, P::Error>> {
214 self.payload().map(P::from_bytes)
215 }
216
217 fn key_view<K: ?Sized + FromBytes>(&self) -> Option<Result<&K, K::Error>> {
221 self.key().map(K::from_bytes)
222 }
223
224 fn headers(&self) -> Option<&Self::Headers>;
226}
227
228pub struct BorrowedHeaders;
233
234impl BorrowedHeaders {
235 unsafe fn from_native_ptr<T>(
236 _owner: &T,
237 headers_ptr: *mut rdsys::rd_kafka_headers_t,
238 ) -> &BorrowedHeaders {
239 &*(headers_ptr as *mut BorrowedHeaders)
240 }
241
242 fn as_native_ptr(&self) -> *const RDKafkaHeaders {
243 self as *const BorrowedHeaders as *const RDKafkaHeaders
244 }
245
246 pub fn detach(&self) -> OwnedHeaders {
251 OwnedHeaders {
252 ptr: unsafe {
253 NativePtr::from_ptr(rdsys::rd_kafka_headers_copy(self.as_native_ptr())).unwrap()
254 },
255 }
256 }
257}
258
259impl Headers for BorrowedHeaders {
260 fn count(&self) -> usize {
261 unsafe { rdsys::rd_kafka_header_cnt(self.as_native_ptr()) }
262 }
263
264 fn try_get(&self, idx: usize) -> Option<Header<'_, &[u8]>> {
265 let mut value_ptr = ptr::null();
266 let mut name_ptr = ptr::null();
267 let mut value_size = 0;
268 let err = unsafe {
269 rdsys::rd_kafka_header_get_all(
270 self.as_native_ptr(),
271 idx,
272 &mut name_ptr,
273 &mut value_ptr,
274 &mut value_size,
275 )
276 };
277 if err.is_error() {
278 None
279 } else {
280 unsafe {
281 Some(Header {
282 key: CStr::from_ptr(name_ptr).to_str().unwrap(),
283 value: (!value_ptr.is_null())
284 .then(|| util::ptr_to_slice(value_ptr, value_size)),
285 })
286 }
287 }
288 }
289}
290
291pub struct BorrowedMessage<'a> {
308 ptr: NativePtr<RDKafkaMessage>,
309 _owner: PhantomData<&'a u8>,
310}
311
312unsafe impl KafkaDrop for RDKafkaMessage {
313 const TYPE: &'static str = "message";
314 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_message_destroy;
315}
316
317impl<'a> fmt::Debug for BorrowedMessage<'a> {
318 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
319 write!(f, "Message {{ ptr: {:?} }}", self.ptr())
320 }
321}
322
323impl<'a> BorrowedMessage<'a> {
324 pub(crate) unsafe fn from_consumer<C>(
331 ptr: NativePtr<RDKafkaMessage>,
332 _consumer: &'a C,
333 ) -> KafkaResult<BorrowedMessage<'a>> {
334 if ptr.err.is_error() {
335 let err = match ptr.err {
336 rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => {
337 KafkaError::PartitionEOF((*ptr).partition)
338 }
339 e => KafkaError::MessageConsumption(e.into()),
340 };
341 Err(err)
342 } else {
343 Ok(BorrowedMessage {
344 ptr,
345 _owner: PhantomData,
346 })
347 }
348 }
349
350 pub(crate) unsafe fn from_dr_callback<O>(
356 ptr: *mut RDKafkaMessage,
357 _owner: &'a O,
358 ) -> DeliveryResult<'a> {
359 let borrowed_message = BorrowedMessage {
360 ptr: NativePtr::from_ptr(ptr).unwrap(),
361 _owner: PhantomData,
362 };
363 if (*ptr).err.is_error() {
364 Err((
365 KafkaError::MessageProduction((*ptr).err.into()),
366 borrowed_message,
367 ))
368 } else {
369 Ok(borrowed_message)
370 }
371 }
372
373 pub fn ptr(&self) -> *mut RDKafkaMessage {
375 self.ptr.ptr()
376 }
377
378 pub fn topic_ptr(&self) -> *mut RDKafkaTopic {
380 self.ptr.rkt
381 }
382
383 pub fn key_len(&self) -> usize {
385 self.ptr.key_len
386 }
387
388 pub fn payload_len(&self) -> usize {
390 self.ptr.len
391 }
392
393 pub fn detach(&self) -> OwnedMessage {
398 OwnedMessage {
399 key: self.key().map(|k| k.to_vec()),
400 payload: self.payload().map(|p| p.to_vec()),
401 topic: self.topic().to_owned(),
402 timestamp: self.timestamp(),
403 partition: self.partition(),
404 offset: self.offset(),
405 headers: self.headers().map(BorrowedHeaders::detach),
406 }
407 }
408}
409
410impl<'a> Message for BorrowedMessage<'a> {
411 type Headers = BorrowedHeaders;
412
413 fn key(&self) -> Option<&[u8]> {
414 unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) }
415 }
416
417 fn payload(&self) -> Option<&[u8]> {
418 unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) }
419 }
420
421 unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
422 util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len)
423 }
424
425 fn topic(&self) -> &str {
426 unsafe {
427 CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt))
428 .to_str()
429 .expect("Topic name is not valid UTF-8")
430 }
431 }
432
433 fn partition(&self) -> i32 {
434 self.ptr.partition
435 }
436
437 fn offset(&self) -> i64 {
438 self.ptr.offset
439 }
440
441 fn timestamp(&self) -> Timestamp {
442 let mut timestamp_type = rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
443 let timestamp =
444 unsafe { rdsys::rd_kafka_message_timestamp(self.ptr.ptr(), &mut timestamp_type) };
445 if timestamp == -1 {
446 Timestamp::NotAvailable
447 } else {
448 match timestamp_type {
449 rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE => {
450 Timestamp::NotAvailable
451 }
452 rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_CREATE_TIME => {
453 Timestamp::CreateTime(timestamp)
454 }
455 rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME => {
456 Timestamp::LogAppendTime(timestamp)
457 }
458 }
459 }
460 }
461
462 fn headers(&self) -> Option<&BorrowedHeaders> {
463 let mut native_headers_ptr = ptr::null_mut();
464 unsafe {
465 let err = rdsys::rd_kafka_message_headers(self.ptr.ptr(), &mut native_headers_ptr);
466 match err.into() {
467 RDKafkaErrorCode::NoError => {
468 Some(BorrowedHeaders::from_native_ptr(self, native_headers_ptr))
469 }
470 RDKafkaErrorCode::NoEnt => None,
471 _ => None,
472 }
473 }
474 }
475}
476
477unsafe impl<'a> Send for BorrowedMessage<'a> {}
478unsafe impl<'a> Sync for BorrowedMessage<'a> {}
479
480#[derive(Debug)]
490pub struct OwnedHeaders {
491 ptr: NativePtr<RDKafkaHeaders>,
492}
493
494unsafe impl KafkaDrop for RDKafkaHeaders {
495 const TYPE: &'static str = "headers";
496 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_headers_destroy;
497}
498
499unsafe impl Send for OwnedHeaders {}
500unsafe impl Sync for OwnedHeaders {}
501
502impl OwnedHeaders {
503 pub fn new() -> OwnedHeaders {
505 OwnedHeaders::new_with_capacity(5)
506 }
507
508 pub fn new_with_capacity(initial_capacity: usize) -> OwnedHeaders {
511 OwnedHeaders {
512 ptr: unsafe {
513 NativePtr::from_ptr(rdsys::rd_kafka_headers_new(initial_capacity)).unwrap()
514 },
515 }
516 }
517
518 pub fn insert<V>(self, header: Header<'_, &V>) -> OwnedHeaders
520 where
521 V: ToBytes + ?Sized,
522 {
523 let name_cstring = CString::new(header.key.to_owned()).unwrap();
524 let (value_ptr, value_len) = match header.value {
525 None => (ptr::null_mut(), 0),
526 Some(value) => {
527 let value_bytes = value.to_bytes();
528 (
529 value_bytes.as_ptr() as *mut c_void,
530 value_bytes.len() as isize,
531 )
532 }
533 };
534 let err = unsafe {
535 rdsys::rd_kafka_header_add(
536 self.ptr(),
537 name_cstring.as_ptr(),
538 name_cstring.as_bytes().len() as isize,
539 value_ptr,
540 value_len,
541 )
542 };
543 assert!(!err.is_error());
545 self
546 }
547
548 pub(crate) fn ptr(&self) -> *mut RDKafkaHeaders {
549 self.ptr.ptr()
550 }
551
552 pub fn as_borrowed(&self) -> &BorrowedHeaders {
554 unsafe { &*(self.ptr() as *mut RDKafkaHeaders as *mut BorrowedHeaders) }
555 }
556}
557
558impl Default for OwnedHeaders {
559 fn default() -> OwnedHeaders {
560 OwnedHeaders::new()
561 }
562}
563
564impl Headers for OwnedHeaders {
565 fn count(&self) -> usize {
566 unsafe { rdsys::rd_kafka_header_cnt(self.ptr()) }
567 }
568
569 fn try_get(&self, idx: usize) -> Option<Header<'_, &[u8]>> {
570 self.as_borrowed().try_get(idx)
571 }
572}
573
574impl Clone for OwnedHeaders {
575 fn clone(&self) -> Self {
576 OwnedHeaders {
577 ptr: unsafe { NativePtr::from_ptr(rdsys::rd_kafka_headers_copy(self.ptr())).unwrap() },
578 }
579 }
580}
581
582#[derive(Debug, Clone)]
588pub struct OwnedMessage {
589 payload: Option<Vec<u8>>,
590 key: Option<Vec<u8>>,
591 topic: String,
592 timestamp: Timestamp,
593 partition: i32,
594 offset: i64,
595 headers: Option<OwnedHeaders>,
596}
597
598impl OwnedMessage {
599 pub fn new(
603 payload: Option<Vec<u8>>,
604 key: Option<Vec<u8>>,
605 topic: String,
606 timestamp: Timestamp,
607 partition: i32,
608 offset: i64,
609 headers: Option<OwnedHeaders>,
610 ) -> OwnedMessage {
611 OwnedMessage {
612 payload,
613 key,
614 topic,
615 timestamp,
616 partition,
617 offset,
618 headers,
619 }
620 }
621
622 pub fn detach_headers(&mut self) -> Option<OwnedHeaders> {
624 self.headers.take()
625 }
626
627 pub fn replace_headers(mut self, headers: Option<OwnedHeaders>) -> Self {
629 if let Some(headers) = headers {
630 self.headers.replace(headers);
631 } else {
632 self.headers = None;
633 }
634 self
635 }
636
637 pub fn set_payload(mut self, payload: Option<Vec<u8>>) -> Self {
639 if let Some(payload) = payload {
640 self.payload.replace(payload);
641 } else {
642 self.payload = None;
643 }
644 self
645 }
646
647 pub fn set_key(mut self, key: Option<Vec<u8>>) -> Self {
649 if let Some(key) = key {
650 self.key.replace(key);
651 } else {
652 self.key = None;
653 }
654 self
655 }
656
657 pub fn set_topic(mut self, topic: String) -> Self {
659 self.topic = topic;
660 self
661 }
662
663 pub fn set_timestamp(mut self, timestamp: Timestamp) -> Self {
665 self.timestamp = timestamp;
666 self
667 }
668
669 pub fn set_partition(mut self, partition: i32) -> Self {
671 self.partition = partition;
672 self
673 }
674
675 pub fn set_offset(mut self, offset: i64) -> Self {
677 self.offset = offset;
678 self
679 }
680}
681
682impl Message for OwnedMessage {
683 type Headers = OwnedHeaders;
684
685 fn key(&self) -> Option<&[u8]> {
686 match self.key {
687 Some(ref k) => Some(k.as_slice()),
688 None => None,
689 }
690 }
691
692 fn payload(&self) -> Option<&[u8]> {
693 self.payload.as_deref()
694 }
695
696 unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
697 self.payload.as_deref_mut()
698 }
699
700 fn topic(&self) -> &str {
701 self.topic.as_ref()
702 }
703
704 fn partition(&self) -> i32 {
705 self.partition
706 }
707
708 fn offset(&self) -> i64 {
709 self.offset
710 }
711
712 fn timestamp(&self) -> Timestamp {
713 self.timestamp
714 }
715
716 fn headers(&self) -> Option<&OwnedHeaders> {
717 self.headers.as_ref()
718 }
719}
720
721pub type DeliveryResult<'a> = Result<BorrowedMessage<'a>, (KafkaError, BorrowedMessage<'a>)>;
735
736pub trait FromBytes {
744 type Error;
746 fn from_bytes(_: &[u8]) -> Result<&Self, Self::Error>;
748}
749
750impl FromBytes for [u8] {
751 type Error = ();
752 fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
753 Ok(bytes)
754 }
755}
756
757impl FromBytes for str {
758 type Error = str::Utf8Error;
759 fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
760 str::from_utf8(bytes)
761 }
762}
763
764pub trait ToBytes {
771 fn to_bytes(&self) -> &[u8];
773}
774
775impl ToBytes for [u8] {
776 fn to_bytes(&self) -> &[u8] {
777 self
778 }
779}
780
781impl ToBytes for str {
782 fn to_bytes(&self) -> &[u8] {
783 self.as_bytes()
784 }
785}
786
787impl ToBytes for Vec<u8> {
788 fn to_bytes(&self) -> &[u8] {
789 self.as_slice()
790 }
791}
792
793impl ToBytes for String {
794 fn to_bytes(&self) -> &[u8] {
795 self.as_bytes()
796 }
797}
798
799impl<'a, T: ToBytes> ToBytes for &'a T {
800 fn to_bytes(&self) -> &[u8] {
801 (*self).to_bytes()
802 }
803}
804
805impl ToBytes for () {
806 fn to_bytes(&self) -> &[u8] {
807 &[]
808 }
809}
810
811macro_rules! array_impls {
813 ($($N:expr)+) => {
814 $(
815 impl ToBytes for [u8; $N] {
816 fn to_bytes(&self) -> &[u8] { self }
817 }
818 )+
819 }
820}
821
822array_impls! {
823 0 1 2 3 4 5 6 7 8 9
824 10 11 12 13 14 15 16 17 18 19
825 20 21 22 23 24 25 26 27 28 29
826 30 31 32
827}
828
829#[cfg(test)]
830mod test {
831 use super::*;
832 use std::time::SystemTime;
833
834 #[test]
835 fn test_timestamp_creation() {
836 let now = SystemTime::now();
837 let t1 = Timestamp::now();
838 let t2 = Timestamp::from(now);
839 let expected = Timestamp::CreateTime(util::millis_to_epoch(now));
840
841 assert_eq!(t2, expected);
842 assert!(t1.to_millis().unwrap() - t2.to_millis().unwrap() < 10);
843 }
844
845 #[test]
846 fn test_timestamp_conversion() {
847 assert_eq!(Timestamp::CreateTime(100).to_millis(), Some(100));
848 assert_eq!(Timestamp::LogAppendTime(100).to_millis(), Some(100));
849 assert_eq!(Timestamp::CreateTime(-1).to_millis(), None);
850 assert_eq!(Timestamp::LogAppendTime(-1).to_millis(), None);
851 assert_eq!(Timestamp::NotAvailable.to_millis(), None);
852 let t: Timestamp = 100.into();
853 assert_eq!(t, Timestamp::CreateTime(100));
854 }
855
856 #[test]
857 fn test_headers() {
858 let owned = OwnedHeaders::new()
859 .insert(Header {
860 key: "key1",
861 value: Some("value1"),
862 })
863 .insert(Header {
864 key: "key2",
865 value: Some("value2"),
866 });
867 assert_eq!(
868 owned.get(0),
869 Header {
870 key: "key1",
871 value: Some(&[118, 97, 108, 117, 101, 49][..])
872 }
873 );
874 assert_eq!(
875 owned.get_as::<str>(1),
876 Ok(Header {
877 key: "key2",
878 value: Some("value2")
879 })
880 );
881 }
882}