madsim_rdkafka/std/
metadata.rs

1//! Cluster metadata.
2
3use std::ffi::CStr;
4use std::slice;
5
6use rdkafka_sys as rdsys;
7use rdkafka_sys::types::*;
8
9use crate::error::IsError;
10use crate::util::{KafkaDrop, NativePtr};
11
12/// Broker metadata information.
13pub struct MetadataBroker(RDKafkaMetadataBroker);
14
15impl MetadataBroker {
16    /// Returns the id of the broker.
17    pub fn id(&self) -> i32 {
18        self.0.id
19    }
20
21    /// Returns the host name of the broker.
22    pub fn host(&self) -> &str {
23        unsafe {
24            CStr::from_ptr(self.0.host)
25                .to_str()
26                .expect("Broker host is not a valid UTF-8 string")
27        }
28    }
29
30    /// Returns the port of the broker.
31    pub fn port(&self) -> i32 {
32        self.0.port
33    }
34}
35
36/// Partition metadata information.
37pub struct MetadataPartition(RDKafkaMetadataPartition);
38
39impl MetadataPartition {
40    /// Returns the id of the partition.
41    pub fn id(&self) -> i32 {
42        self.0.id
43    }
44
45    /// Returns the broker id of the leader broker for the partition.
46    pub fn leader(&self) -> i32 {
47        self.0.leader
48    }
49
50    // TODO: return result?
51    /// Returns the metadata error for the partition, or `None` if there is no
52    /// error.
53    pub fn error(&self) -> Option<RDKafkaRespErr> {
54        if self.0.err.is_error() {
55            Some(self.0.err)
56        } else {
57            None
58        }
59    }
60
61    /// Returns the broker IDs of the replicas.
62    pub fn replicas(&self) -> &[i32] {
63        unsafe { slice::from_raw_parts(self.0.replicas, self.0.replica_cnt as usize) }
64    }
65
66    /// Returns the broker IDs of the in-sync replicas.
67    pub fn isr(&self) -> &[i32] {
68        unsafe { slice::from_raw_parts(self.0.isrs, self.0.isr_cnt as usize) }
69    }
70}
71
72/// Topic metadata information.
73pub struct MetadataTopic(RDKafkaMetadataTopic);
74
75impl MetadataTopic {
76    /// Returns the name of the topic.
77    pub fn name(&self) -> &str {
78        unsafe {
79            CStr::from_ptr(self.0.topic)
80                .to_str()
81                .expect("Topic name is not a valid UTF-8 string")
82        }
83    }
84
85    /// Returns the partition metadata information for all the partitions.
86    pub fn partitions(&self) -> &[MetadataPartition] {
87        unsafe {
88            slice::from_raw_parts(
89                self.0.partitions as *const MetadataPartition,
90                self.0.partition_cnt as usize,
91            )
92        }
93    }
94
95    /// Returns the metadata error for the topic, or `None` if there was no
96    /// error.
97    pub fn error(&self) -> Option<RDKafkaRespErr> {
98        if self.0.err.is_error() {
99            Some(self.0.err)
100        } else {
101            None
102        }
103    }
104}
105
106/// Metadata container.
107///
108/// This structure wraps the metadata pointer returned by rdkafka-sys, and
109/// deallocates all the native resources when dropped.
110pub struct Metadata(NativePtr<RDKafkaMetadata>);
111
112unsafe impl KafkaDrop for RDKafkaMetadata {
113    const TYPE: &'static str = "metadata";
114    const DROP: unsafe extern "C" fn(*mut Self) = drop_metadata;
115}
116
117unsafe extern "C" fn drop_metadata(ptr: *mut RDKafkaMetadata) {
118    rdsys::rd_kafka_metadata_destroy(ptr as *const _)
119}
120
121impl Metadata {
122    /// Creates a new Metadata container given a pointer to the native rdkafka-sys metadata.
123    pub(crate) unsafe fn from_ptr(ptr: *const RDKafkaMetadata) -> Metadata {
124        Metadata(NativePtr::from_ptr(ptr as *mut _).unwrap())
125    }
126
127    /// Returns the ID of the broker originating this metadata.
128    pub fn orig_broker_id(&self) -> i32 {
129        self.0.orig_broker_id
130    }
131
132    /// Returns the hostname of the broker originating this metadata.
133    pub fn orig_broker_name(&self) -> &str {
134        unsafe {
135            CStr::from_ptr(self.0.orig_broker_name)
136                .to_str()
137                .expect("Broker name is not a valid UTF-8 string")
138        }
139    }
140
141    /// Returns the metadata information for all the brokers in the cluster.
142    pub fn brokers(&self) -> &[MetadataBroker] {
143        unsafe {
144            slice::from_raw_parts(
145                self.0.brokers as *const MetadataBroker,
146                self.0.broker_cnt as usize,
147            )
148        }
149    }
150
151    /// Returns the metadata information for all the topics in the cluster.
152    pub fn topics(&self) -> &[MetadataTopic] {
153        unsafe {
154            slice::from_raw_parts(
155                self.0.topics as *const MetadataTopic,
156                self.0.topic_cnt as usize,
157            )
158        }
159    }
160}
161
162unsafe impl Send for Metadata {}
163unsafe impl Sync for Metadata {}