madsim_rdkafka/std/
message.rs

1//! Store and manipulate Kafka messages.
2
3use std::ffi::{CStr, CString};
4use std::fmt;
5use std::marker::PhantomData;
6use std::os::raw::c_void;
7use std::ptr;
8use std::str;
9use std::time::SystemTime;
10
11use rdkafka_sys as rdsys;
12use rdkafka_sys::types::*;
13
14use crate::error::{IsError, KafkaError, KafkaResult};
15use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr};
16
17/// Timestamp of a Kafka message.
18#[derive(Debug, PartialEq, Eq, Clone, Copy)]
19pub enum Timestamp {
20    /// Timestamp not available.
21    NotAvailable,
22    /// Message creation time.
23    CreateTime(i64),
24    /// Log append time.
25    LogAppendTime(i64),
26}
27
28impl Timestamp {
29    /// Convert the timestamp to milliseconds since epoch.
30    pub fn to_millis(self) -> Option<i64> {
31        match self {
32            Timestamp::NotAvailable | Timestamp::CreateTime(-1) | Timestamp::LogAppendTime(-1) => {
33                None
34            }
35            Timestamp::CreateTime(t) | Timestamp::LogAppendTime(t) => Some(t),
36        }
37    }
38
39    /// Creates a new `Timestamp::CreateTime` representing the current time.
40    pub fn now() -> Timestamp {
41        Timestamp::from(SystemTime::now())
42    }
43}
44
45impl From<i64> for Timestamp {
46    fn from(system_time: i64) -> Timestamp {
47        Timestamp::CreateTime(system_time)
48    }
49}
50
51impl From<SystemTime> for Timestamp {
52    fn from(system_time: SystemTime) -> Timestamp {
53        Timestamp::CreateTime(millis_to_epoch(system_time))
54    }
55}
56
57// Use TryFrom when stable
58//impl From<Timestamp> for i64 {
59//    fn from(timestamp: Timestamp) -> i64 {
60//        timestamp.to_millis().unwrap()
61//    }
62//}
63
64/// A generic representation of Kafka message headers.
65///
66/// This trait represents readable message headers. Headers are key-value pairs
67/// that can be sent alongside every message. Only read-only methods are
68/// provided by this trait, as the underlying storage might not allow
69/// modification.
70pub trait Headers {
71    /// Returns the number of contained headers.
72    fn count(&self) -> usize;
73
74    /// Gets the specified header, where the first header corresponds to
75    /// index 0.
76    ///
77    /// Panics if the index is out of bounds.
78    fn get(&self, idx: usize) -> Header<'_, &[u8]> {
79        self.try_get(idx).unwrap_or_else(|| {
80            panic!(
81                "headers index out of bounds: the count is {} but the index is {}",
82                self.count(),
83                idx,
84            )
85        })
86    }
87
88    /// Like [`Headers::get`], but the value of the header will be converted
89    /// to the specified type.
90    ///
91    /// Panics if the index is out of bounds.
92    fn get_as<V>(&self, idx: usize) -> Result<Header<'_, &V>, V::Error>
93    where
94        V: FromBytes + ?Sized,
95    {
96        self.try_get_as(idx).unwrap_or_else(|| {
97            panic!(
98                "headers index out of bounds: the count is {} but the index is {}",
99                self.count(),
100                idx,
101            )
102        })
103    }
104
105    /// Like [`Headers::get`], but returns an option if the header is out of
106    /// bounds rather than panicking.
107    fn try_get(&self, idx: usize) -> Option<Header<'_, &[u8]>>;
108
109    /// Like [`Headers::get`], but returns an option if the header is out of
110    /// bounds rather than panicking.
111    fn try_get_as<V>(&self, idx: usize) -> Option<Result<Header<'_, &V>, V::Error>>
112    where
113        V: FromBytes + ?Sized,
114    {
115        self.try_get(idx).map(|header| header.parse())
116    }
117
118    /// Iterates over all headers in order.
119    fn iter(&self) -> HeadersIter<'_, Self>
120    where
121        Self: Sized,
122    {
123        HeadersIter {
124            headers: self,
125            index: 0,
126        }
127    }
128}
129
130/// A Kafka message header.
131#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
132pub struct Header<'a, V> {
133    /// The header's key.
134    pub key: &'a str,
135    /// The header's value.
136    pub value: Option<V>,
137}
138
139impl<'a> Header<'a, &'a [u8]> {
140    fn parse<V>(&self) -> Result<Header<'a, &'a V>, V::Error>
141    where
142        V: FromBytes + ?Sized,
143    {
144        Ok(Header {
145            key: self.key,
146            value: self.value.map(V::from_bytes).transpose()?,
147        })
148    }
149}
150
151/// An iterator over [`Headers`].
152pub struct HeadersIter<'a, H> {
153    headers: &'a H,
154    index: usize,
155}
156
157impl<'a, H> Iterator for HeadersIter<'a, H>
158where
159    H: Headers,
160{
161    type Item = Header<'a, &'a [u8]>;
162
163    fn next(&mut self) -> Option<Header<'a, &'a [u8]>> {
164        if self.index < self.headers.count() {
165            let item = self.headers.get(self.index);
166            self.index += 1;
167            Some(item)
168        } else {
169            None
170        }
171    }
172}
173
174/// A generic representation of a Kafka message.
175///
176/// Only read-only methods are provided by this trait, as the underlying storage
177/// might not allow modification.
178pub trait Message {
179    /// The type of headers that this message contains.
180    type Headers: Headers;
181
182    /// Returns the key of the message, or `None` if there is no key.
183    fn key(&self) -> Option<&[u8]>;
184
185    /// Returns the payload of the message, or `None` if there is no payload.
186    fn payload(&self) -> Option<&[u8]>;
187
188    /// Returns a mutable reference to the payload of the message, or `None` if
189    /// there is no payload.
190    ///
191    ///
192    /// # Safety
193    ///
194    /// librdkafka does not formally guarantee that modifying the payload is
195    /// safe. Calling this method may therefore result in undefined behavior.
196    unsafe fn payload_mut(&mut self) -> Option<&mut [u8]>;
197
198    /// Returns the source topic of the message.
199    fn topic(&self) -> &str;
200
201    /// Returns the partition number where the message is stored.
202    fn partition(&self) -> i32;
203
204    /// Returns the offset of the message within the partition.
205    fn offset(&self) -> i64;
206
207    /// Returns the message timestamp.
208    fn timestamp(&self) -> Timestamp;
209
210    /// Converts the raw bytes of the payload to a reference of the specified
211    /// type, that points to the same data inside the message and without
212    /// performing any memory allocation.
213    fn payload_view<P: ?Sized + FromBytes>(&self) -> Option<Result<&P, P::Error>> {
214        self.payload().map(P::from_bytes)
215    }
216
217    /// Converts the raw bytes of the key to a reference of the specified type,
218    /// that points to the same data inside the message and without performing
219    /// any memory allocation.
220    fn key_view<K: ?Sized + FromBytes>(&self) -> Option<Result<&K, K::Error>> {
221        self.key().map(K::from_bytes)
222    }
223
224    /// Returns the headers of the message, or `None` if there are no headers.
225    fn headers(&self) -> Option<&Self::Headers>;
226}
227
228/// A zero-copy collection of Kafka message headers.
229///
230/// Provides a read-only access to headers owned by a Kafka consumer or producer
231/// or by an [`OwnedHeaders`] struct.
232pub struct BorrowedHeaders;
233
234impl BorrowedHeaders {
235    unsafe fn from_native_ptr<T>(
236        _owner: &T,
237        headers_ptr: *mut rdsys::rd_kafka_headers_t,
238    ) -> &BorrowedHeaders {
239        &*(headers_ptr as *mut BorrowedHeaders)
240    }
241
242    fn as_native_ptr(&self) -> *const RDKafkaHeaders {
243        self as *const BorrowedHeaders as *const RDKafkaHeaders
244    }
245
246    /// Clones the content of `BorrowedHeaders` and returns an [`OwnedHeaders`]
247    /// that can outlive the consumer.
248    ///
249    /// This operation requires memory allocation and can be expensive.
250    pub fn detach(&self) -> OwnedHeaders {
251        OwnedHeaders {
252            ptr: unsafe {
253                NativePtr::from_ptr(rdsys::rd_kafka_headers_copy(self.as_native_ptr())).unwrap()
254            },
255        }
256    }
257}
258
259impl Headers for BorrowedHeaders {
260    fn count(&self) -> usize {
261        unsafe { rdsys::rd_kafka_header_cnt(self.as_native_ptr()) }
262    }
263
264    fn try_get(&self, idx: usize) -> Option<Header<'_, &[u8]>> {
265        let mut value_ptr = ptr::null();
266        let mut name_ptr = ptr::null();
267        let mut value_size = 0;
268        let err = unsafe {
269            rdsys::rd_kafka_header_get_all(
270                self.as_native_ptr(),
271                idx,
272                &mut name_ptr,
273                &mut value_ptr,
274                &mut value_size,
275            )
276        };
277        if err.is_error() {
278            None
279        } else {
280            unsafe {
281                Some(Header {
282                    key: CStr::from_ptr(name_ptr).to_str().unwrap(),
283                    value: (!value_ptr.is_null())
284                        .then(|| util::ptr_to_slice(value_ptr, value_size)),
285                })
286            }
287        }
288    }
289}
290
291/// A zero-copy Kafka message.
292///
293/// Provides a read-only access to headers owned by a Kafka consumer or producer
294/// or by an [`OwnedMessage`] struct.
295///
296/// ## Consumers
297///
298/// `BorrowedMessage`s coming from consumers are removed from the consumer
299/// buffer once they are dropped. Holding references to too many messages will
300/// cause the memory of the consumer to fill up and the consumer to block until
301/// some of the `BorrowedMessage`s are dropped.
302///
303/// ## Conversion to owned
304///
305/// To transform a `BorrowedMessage` into a [`OwnedMessage`], use the
306/// [`detach`](BorrowedMessage::detach) method.
307pub struct BorrowedMessage<'a> {
308    ptr: NativePtr<RDKafkaMessage>,
309    _owner: PhantomData<&'a u8>,
310}
311
312unsafe impl KafkaDrop for RDKafkaMessage {
313    const TYPE: &'static str = "message";
314    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_message_destroy;
315}
316
317impl<'a> fmt::Debug for BorrowedMessage<'a> {
318    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
319        write!(f, "Message {{ ptr: {:?} }}", self.ptr())
320    }
321}
322
323impl<'a> BorrowedMessage<'a> {
324    /// Creates a new `BorrowedMessage` that wraps the native Kafka message
325    /// pointer returned by a consumer. The lifetime of the message will be
326    /// bound to the lifetime of the consumer passed as parameter. This method
327    /// should only be used with messages coming from consumers. If the message
328    /// contains an error, only the error is returned and the message structure
329    /// is freed.
330    pub(crate) unsafe fn from_consumer<C>(
331        ptr: NativePtr<RDKafkaMessage>,
332        _consumer: &'a C,
333    ) -> KafkaResult<BorrowedMessage<'a>> {
334        if ptr.err.is_error() {
335            let err = match ptr.err {
336                rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => {
337                    KafkaError::PartitionEOF((*ptr).partition)
338                }
339                e => KafkaError::MessageConsumption(e.into()),
340            };
341            Err(err)
342        } else {
343            Ok(BorrowedMessage {
344                ptr,
345                _owner: PhantomData,
346            })
347        }
348    }
349
350    /// Creates a new `BorrowedMessage` that wraps the native Kafka message
351    /// pointer returned by the delivery callback of a producer. The lifetime of
352    /// the message will be bound to the lifetime of the reference passed as
353    /// parameter. This method should only be used with messages coming from the
354    /// delivery callback. The message will not be freed in any circumstance.
355    pub(crate) unsafe fn from_dr_callback<O>(
356        ptr: *mut RDKafkaMessage,
357        _owner: &'a O,
358    ) -> DeliveryResult<'a> {
359        let borrowed_message = BorrowedMessage {
360            ptr: NativePtr::from_ptr(ptr).unwrap(),
361            _owner: PhantomData,
362        };
363        if (*ptr).err.is_error() {
364            Err((
365                KafkaError::MessageProduction((*ptr).err.into()),
366                borrowed_message,
367            ))
368        } else {
369            Ok(borrowed_message)
370        }
371    }
372
373    /// Returns a pointer to the [`RDKafkaMessage`].
374    pub fn ptr(&self) -> *mut RDKafkaMessage {
375        self.ptr.ptr()
376    }
377
378    /// Returns a pointer to the message's [`RDKafkaTopic`]
379    pub fn topic_ptr(&self) -> *mut RDKafkaTopic {
380        self.ptr.rkt
381    }
382
383    /// Returns the length of the key field of the message.
384    pub fn key_len(&self) -> usize {
385        self.ptr.key_len
386    }
387
388    /// Returns the length of the payload field of the message.
389    pub fn payload_len(&self) -> usize {
390        self.ptr.len
391    }
392
393    /// Clones the content of the `BorrowedMessage` and returns an
394    /// [`OwnedMessage`] that can outlive the consumer.
395    ///
396    /// This operation requires memory allocation and can be expensive.
397    pub fn detach(&self) -> OwnedMessage {
398        OwnedMessage {
399            key: self.key().map(|k| k.to_vec()),
400            payload: self.payload().map(|p| p.to_vec()),
401            topic: self.topic().to_owned(),
402            timestamp: self.timestamp(),
403            partition: self.partition(),
404            offset: self.offset(),
405            headers: self.headers().map(BorrowedHeaders::detach),
406        }
407    }
408}
409
410impl<'a> Message for BorrowedMessage<'a> {
411    type Headers = BorrowedHeaders;
412
413    fn key(&self) -> Option<&[u8]> {
414        unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) }
415    }
416
417    fn payload(&self) -> Option<&[u8]> {
418        unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) }
419    }
420
421    unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
422        util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len)
423    }
424
425    fn topic(&self) -> &str {
426        unsafe {
427            CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt))
428                .to_str()
429                .expect("Topic name is not valid UTF-8")
430        }
431    }
432
433    fn partition(&self) -> i32 {
434        self.ptr.partition
435    }
436
437    fn offset(&self) -> i64 {
438        self.ptr.offset
439    }
440
441    fn timestamp(&self) -> Timestamp {
442        let mut timestamp_type = rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
443        let timestamp =
444            unsafe { rdsys::rd_kafka_message_timestamp(self.ptr.ptr(), &mut timestamp_type) };
445        if timestamp == -1 {
446            Timestamp::NotAvailable
447        } else {
448            match timestamp_type {
449                rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE => {
450                    Timestamp::NotAvailable
451                }
452                rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_CREATE_TIME => {
453                    Timestamp::CreateTime(timestamp)
454                }
455                rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME => {
456                    Timestamp::LogAppendTime(timestamp)
457                }
458            }
459        }
460    }
461
462    fn headers(&self) -> Option<&BorrowedHeaders> {
463        let mut native_headers_ptr = ptr::null_mut();
464        unsafe {
465            let err = rdsys::rd_kafka_message_headers(self.ptr.ptr(), &mut native_headers_ptr);
466            match err.into() {
467                RDKafkaErrorCode::NoError => {
468                    Some(BorrowedHeaders::from_native_ptr(self, native_headers_ptr))
469                }
470                RDKafkaErrorCode::NoEnt => None,
471                _ => None,
472            }
473        }
474    }
475}
476
477unsafe impl<'a> Send for BorrowedMessage<'a> {}
478unsafe impl<'a> Sync for BorrowedMessage<'a> {}
479
480//
481// ********** OWNED MESSAGE **********
482//
483
484/// A collection of Kafka message headers that owns its backing data.
485///
486/// Kafka supports associating an array of key-value pairs to every message,
487/// called message headers. The `OwnedHeaders` can be used to create the desired
488/// headers and to pass them to the producer. See also [`BorrowedHeaders`].
489#[derive(Debug)]
490pub struct OwnedHeaders {
491    ptr: NativePtr<RDKafkaHeaders>,
492}
493
494unsafe impl KafkaDrop for RDKafkaHeaders {
495    const TYPE: &'static str = "headers";
496    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_headers_destroy;
497}
498
499unsafe impl Send for OwnedHeaders {}
500unsafe impl Sync for OwnedHeaders {}
501
502impl OwnedHeaders {
503    /// Creates a new `OwnedHeaders` struct with initial capacity 5.
504    pub fn new() -> OwnedHeaders {
505        OwnedHeaders::new_with_capacity(5)
506    }
507
508    /// Creates a new `OwnedHeaders` struct with the desired initial capacity.
509    /// The structure is automatically resized as more headers are added.
510    pub fn new_with_capacity(initial_capacity: usize) -> OwnedHeaders {
511        OwnedHeaders {
512            ptr: unsafe {
513                NativePtr::from_ptr(rdsys::rd_kafka_headers_new(initial_capacity)).unwrap()
514            },
515        }
516    }
517
518    /// Inserts a new header.
519    pub fn insert<V>(self, header: Header<'_, &V>) -> OwnedHeaders
520    where
521        V: ToBytes + ?Sized,
522    {
523        let name_cstring = CString::new(header.key.to_owned()).unwrap();
524        let (value_ptr, value_len) = match header.value {
525            None => (ptr::null_mut(), 0),
526            Some(value) => {
527                let value_bytes = value.to_bytes();
528                (
529                    value_bytes.as_ptr() as *mut c_void,
530                    value_bytes.len() as isize,
531                )
532            }
533        };
534        let err = unsafe {
535            rdsys::rd_kafka_header_add(
536                self.ptr(),
537                name_cstring.as_ptr(),
538                name_cstring.as_bytes().len() as isize,
539                value_ptr,
540                value_len,
541            )
542        };
543        // OwnedHeaders should always represent writable instances of RDKafkaHeaders
544        assert!(!err.is_error());
545        self
546    }
547
548    pub(crate) fn ptr(&self) -> *mut RDKafkaHeaders {
549        self.ptr.ptr()
550    }
551
552    /// Generates a read-only [`BorrowedHeaders`] reference.
553    pub fn as_borrowed(&self) -> &BorrowedHeaders {
554        unsafe { &*(self.ptr() as *mut RDKafkaHeaders as *mut BorrowedHeaders) }
555    }
556}
557
558impl Default for OwnedHeaders {
559    fn default() -> OwnedHeaders {
560        OwnedHeaders::new()
561    }
562}
563
564impl Headers for OwnedHeaders {
565    fn count(&self) -> usize {
566        unsafe { rdsys::rd_kafka_header_cnt(self.ptr()) }
567    }
568
569    fn try_get(&self, idx: usize) -> Option<Header<'_, &[u8]>> {
570        self.as_borrowed().try_get(idx)
571    }
572}
573
574impl Clone for OwnedHeaders {
575    fn clone(&self) -> Self {
576        OwnedHeaders {
577            ptr: unsafe { NativePtr::from_ptr(rdsys::rd_kafka_headers_copy(self.ptr())).unwrap() },
578        }
579    }
580}
581
582/// A Kafka message that owns its backing data.
583///
584/// An `OwnedMessage` can be created from a [`BorrowedMessage`] using the
585/// [`BorrowedMessage::detach`] method. `OwnedMessage`s don't hold any reference
586/// to the consumer and don't use any memory inside the consumer buffer.
587#[derive(Debug, Clone)]
588pub struct OwnedMessage {
589    payload: Option<Vec<u8>>,
590    key: Option<Vec<u8>>,
591    topic: String,
592    timestamp: Timestamp,
593    partition: i32,
594    offset: i64,
595    headers: Option<OwnedHeaders>,
596}
597
598impl OwnedMessage {
599    /// Creates a new message with the specified content.
600    ///
601    /// This function is mainly useful in tests of `rust-rdkafka` itself.
602    pub fn new(
603        payload: Option<Vec<u8>>,
604        key: Option<Vec<u8>>,
605        topic: String,
606        timestamp: Timestamp,
607        partition: i32,
608        offset: i64,
609        headers: Option<OwnedHeaders>,
610    ) -> OwnedMessage {
611        OwnedMessage {
612            payload,
613            key,
614            topic,
615            timestamp,
616            partition,
617            offset,
618            headers,
619        }
620    }
621
622    /// Detaches the [`OwnedHeaders`] from this `OwnedMessage`.
623    pub fn detach_headers(&mut self) -> Option<OwnedHeaders> {
624        self.headers.take()
625    }
626
627    /// Replaces the [`OwnedHeaders`] on this `OwnedMessage`.
628    pub fn replace_headers(mut self, headers: Option<OwnedHeaders>) -> Self {
629        if let Some(headers) = headers {
630            self.headers.replace(headers);
631        } else {
632            self.headers = None;
633        }
634        self
635    }
636
637    /// Sets the payload for this `OwnedMessage`.
638    pub fn set_payload(mut self, payload: Option<Vec<u8>>) -> Self {
639        if let Some(payload) = payload {
640            self.payload.replace(payload);
641        } else {
642            self.payload = None;
643        }
644        self
645    }
646
647    /// Sets the key for this `OwnedMessage`.
648    pub fn set_key(mut self, key: Option<Vec<u8>>) -> Self {
649        if let Some(key) = key {
650            self.key.replace(key);
651        } else {
652            self.key = None;
653        }
654        self
655    }
656
657    /// Sets the topic for this `OwnedMessage`.
658    pub fn set_topic(mut self, topic: String) -> Self {
659        self.topic = topic;
660        self
661    }
662
663    /// Sets the timestamp for this `OwnedMessage`.
664    pub fn set_timestamp(mut self, timestamp: Timestamp) -> Self {
665        self.timestamp = timestamp;
666        self
667    }
668
669    /// Sets the partition for this `OwnedMessage`.
670    pub fn set_partition(mut self, partition: i32) -> Self {
671        self.partition = partition;
672        self
673    }
674
675    /// Sets the offset for this `OwnedMessage`.
676    pub fn set_offset(mut self, offset: i64) -> Self {
677        self.offset = offset;
678        self
679    }
680}
681
682impl Message for OwnedMessage {
683    type Headers = OwnedHeaders;
684
685    fn key(&self) -> Option<&[u8]> {
686        match self.key {
687            Some(ref k) => Some(k.as_slice()),
688            None => None,
689        }
690    }
691
692    fn payload(&self) -> Option<&[u8]> {
693        self.payload.as_deref()
694    }
695
696    unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
697        self.payload.as_deref_mut()
698    }
699
700    fn topic(&self) -> &str {
701        self.topic.as_ref()
702    }
703
704    fn partition(&self) -> i32 {
705        self.partition
706    }
707
708    fn offset(&self) -> i64 {
709        self.offset
710    }
711
712    fn timestamp(&self) -> Timestamp {
713        self.timestamp
714    }
715
716    fn headers(&self) -> Option<&OwnedHeaders> {
717        self.headers.as_ref()
718    }
719}
720
721/// The result of a message production.
722///
723/// If message production is successful `DeliveryResult` will contain the sent
724/// message, which can be used to find which partition and offset the message
725/// was sent to. If message production is not successful, the `DeliveryResult`
726/// will contain an error and the message that failed to be sent. The partition
727/// and offset, in this case, will default to -1 and 0 respectively.
728///
729/// ## Lifetimes
730///
731/// In both success or failure scenarios, the payload of the message resides in
732/// the buffer of the producer and will be automatically removed once the
733/// `delivery` callback finishes.
734pub type DeliveryResult<'a> = Result<BorrowedMessage<'a>, (KafkaError, BorrowedMessage<'a>)>;
735
736/// A cheap conversion from a byte slice to typed data.
737///
738/// Given a reference to a byte slice, returns a different view of the same
739/// data. No allocation is performed, however the underlying data might be
740/// checked for correctness (for example when converting to `str`).
741///
742/// See also the [`ToBytes`] trait.
743pub trait FromBytes {
744    /// The error type that will be returned if the conversion fails.
745    type Error;
746    /// Tries to convert the provided byte slice into a different type.
747    fn from_bytes(_: &[u8]) -> Result<&Self, Self::Error>;
748}
749
750impl FromBytes for [u8] {
751    type Error = ();
752    fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
753        Ok(bytes)
754    }
755}
756
757impl FromBytes for str {
758    type Error = str::Utf8Error;
759    fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
760        str::from_utf8(bytes)
761    }
762}
763
764/// A cheap conversion from typed data to a byte slice.
765///
766/// Given some data, returns the byte representation of that data.
767/// No copy of the data should be performed.
768///
769/// See also the [`FromBytes`] trait.
770pub trait ToBytes {
771    /// Converts the provided data to bytes.
772    fn to_bytes(&self) -> &[u8];
773}
774
775impl ToBytes for [u8] {
776    fn to_bytes(&self) -> &[u8] {
777        self
778    }
779}
780
781impl ToBytes for str {
782    fn to_bytes(&self) -> &[u8] {
783        self.as_bytes()
784    }
785}
786
787impl ToBytes for Vec<u8> {
788    fn to_bytes(&self) -> &[u8] {
789        self.as_slice()
790    }
791}
792
793impl ToBytes for String {
794    fn to_bytes(&self) -> &[u8] {
795        self.as_bytes()
796    }
797}
798
799impl<'a, T: ToBytes> ToBytes for &'a T {
800    fn to_bytes(&self) -> &[u8] {
801        (*self).to_bytes()
802    }
803}
804
805impl ToBytes for () {
806    fn to_bytes(&self) -> &[u8] {
807        &[]
808    }
809}
810
811// Implement to_bytes for arrays - https://github.com/rust-lang/rfcs/issues/1038
812macro_rules! array_impls {
813    ($($N:expr)+) => {
814        $(
815            impl ToBytes for [u8; $N] {
816                fn to_bytes(&self) -> &[u8] { self }
817            }
818         )+
819    }
820}
821
822array_impls! {
823     0  1  2  3  4  5  6  7  8  9
824    10 11 12 13 14 15 16 17 18 19
825    20 21 22 23 24 25 26 27 28 29
826    30 31 32
827}
828
829#[cfg(test)]
830mod test {
831    use super::*;
832    use std::time::SystemTime;
833
834    #[test]
835    fn test_timestamp_creation() {
836        let now = SystemTime::now();
837        let t1 = Timestamp::now();
838        let t2 = Timestamp::from(now);
839        let expected = Timestamp::CreateTime(util::millis_to_epoch(now));
840
841        assert_eq!(t2, expected);
842        assert!(t1.to_millis().unwrap() - t2.to_millis().unwrap() < 10);
843    }
844
845    #[test]
846    fn test_timestamp_conversion() {
847        assert_eq!(Timestamp::CreateTime(100).to_millis(), Some(100));
848        assert_eq!(Timestamp::LogAppendTime(100).to_millis(), Some(100));
849        assert_eq!(Timestamp::CreateTime(-1).to_millis(), None);
850        assert_eq!(Timestamp::LogAppendTime(-1).to_millis(), None);
851        assert_eq!(Timestamp::NotAvailable.to_millis(), None);
852        let t: Timestamp = 100.into();
853        assert_eq!(t, Timestamp::CreateTime(100));
854    }
855
856    #[test]
857    fn test_headers() {
858        let owned = OwnedHeaders::new()
859            .insert(Header {
860                key: "key1",
861                value: Some("value1"),
862            })
863            .insert(Header {
864                key: "key2",
865                value: Some("value2"),
866            });
867        assert_eq!(
868            owned.get(0),
869            Header {
870                key: "key1",
871                value: Some(&[118, 97, 108, 117, 101, 49][..])
872            }
873        );
874        assert_eq!(
875            owned.get_as::<str>(1),
876            Ok(Header {
877                key: "key2",
878                value: Some("value2")
879            })
880        );
881    }
882}