madsim_rdkafka/std/
groups.rs

1//! Group membership API.
2
3use std::ffi::CStr;
4use std::fmt;
5use std::slice;
6
7use rdkafka_sys as rdsys;
8use rdkafka_sys::types::*;
9
10use crate::util::{KafkaDrop, NativePtr};
11
12/// Group member information container.
13pub struct GroupMemberInfo(RDKafkaGroupMemberInfo);
14
15impl GroupMemberInfo {
16    /// Returns the ID of the member.
17    pub fn id(&self) -> &str {
18        unsafe {
19            CStr::from_ptr(self.0.member_id)
20                .to_str()
21                .expect("Member id is not a valid UTF-8 string")
22        }
23    }
24
25    /// Returns the client ID of the member.
26    pub fn client_id(&self) -> &str {
27        unsafe {
28            CStr::from_ptr(self.0.client_id)
29                .to_str()
30                .expect("Client id is not a valid UTF-8 string")
31        }
32    }
33
34    /// Return the client host of the member.
35    pub fn client_host(&self) -> &str {
36        unsafe {
37            CStr::from_ptr(self.0.client_host)
38                .to_str()
39                .expect("Member host is not a valid UTF-8 string")
40        }
41    }
42
43    /// Return the metadata of the member.
44    pub fn metadata(&self) -> Option<&[u8]> {
45        unsafe {
46            if self.0.member_metadata.is_null() {
47                None
48            } else {
49                Some(slice::from_raw_parts::<u8>(
50                    self.0.member_metadata as *const u8,
51                    self.0.member_metadata_size as usize,
52                ))
53            }
54        }
55    }
56
57    /// Return the partition assignment of the member.
58    pub fn assignment(&self) -> Option<&[u8]> {
59        unsafe {
60            if self.0.member_assignment.is_null() {
61                None
62            } else {
63                Some(slice::from_raw_parts::<u8>(
64                    self.0.member_assignment as *const u8,
65                    self.0.member_assignment_size as usize,
66                ))
67            }
68        }
69    }
70}
71
72/// Group information container.
73pub struct GroupInfo(RDKafkaGroupInfo);
74
75impl GroupInfo {
76    /// Returns the name of the group.
77    pub fn name(&self) -> &str {
78        unsafe {
79            CStr::from_ptr(self.0.group)
80                .to_str()
81                .expect("Group name is not a valid UTF-8 string")
82        }
83    }
84
85    /// Returns the members of the group.
86    pub fn members(&self) -> &[GroupMemberInfo] {
87        unsafe {
88            slice::from_raw_parts(
89                self.0.members as *const GroupMemberInfo,
90                self.0.member_cnt as usize,
91            )
92        }
93    }
94
95    /// Returns the state of the group.
96    pub fn state(&self) -> &str {
97        unsafe {
98            CStr::from_ptr(self.0.state)
99                .to_str()
100                .expect("State is not a valid UTF-8 string")
101        }
102    }
103
104    /// Returns the protocol of the group.
105    pub fn protocol(&self) -> &str {
106        unsafe {
107            CStr::from_ptr(self.0.protocol)
108                .to_str()
109                .expect("Protocol name is not a valid UTF-8 string")
110        }
111    }
112
113    /// Returns the protocol type of the group.
114    pub fn protocol_type(&self) -> &str {
115        unsafe {
116            CStr::from_ptr(self.0.protocol_type)
117                .to_str()
118                .expect("Protocol type is not a valid UTF-8 string")
119        }
120    }
121}
122
123impl fmt::Debug for GroupInfo {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        write!(f, "{}", self.name())
126    }
127}
128
129/// List of groups.
130///
131/// This structure wraps the pointer returned by rdkafka-sys, and deallocates
132/// all the native resources when dropped.
133pub struct GroupList(NativePtr<RDKafkaGroupList>);
134
135unsafe impl Send for GroupList {}
136
137unsafe impl KafkaDrop for RDKafkaGroupList {
138    const TYPE: &'static str = "group";
139    const DROP: unsafe extern "C" fn(*mut Self) = drop_group_list;
140}
141
142unsafe extern "C" fn drop_group_list(ptr: *mut RDKafkaGroupList) {
143    rdsys::rd_kafka_group_list_destroy(ptr as *const _)
144}
145
146impl GroupList {
147    /// Creates a new group list given a pointer to the native rdkafka-sys group list structure.
148    pub(crate) unsafe fn from_ptr(ptr: *const RDKafkaGroupList) -> GroupList {
149        GroupList(NativePtr::from_ptr(ptr as *mut _).unwrap())
150    }
151
152    /// Returns all the groups in the list.
153    pub fn groups(&self) -> &[GroupInfo] {
154        unsafe {
155            slice::from_raw_parts(self.0.groups as *const GroupInfo, self.0.group_cnt as usize)
156        }
157    }
158}