1use std::error::Error;
4use std::ffi::{self, CStr};
5use std::fmt;
6use std::ptr;
7use std::sync::Arc;
8
9use rdkafka_sys as rdsys;
10use rdkafka_sys::types::*;
11
12use crate::util::{KafkaDrop, NativePtr};
13
14pub use rdsys::types::RDKafkaErrorCode;
16
17pub type KafkaResult<T> = Result<T, KafkaError>;
19
20pub trait IsError {
24 fn is_error(&self) -> bool;
26}
27
28impl IsError for RDKafkaRespErr {
29 fn is_error(&self) -> bool {
30 *self != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR
31 }
32}
33
34impl IsError for RDKafkaConfRes {
35 fn is_error(&self) -> bool {
36 *self != RDKafkaConfRes::RD_KAFKA_CONF_OK
37 }
38}
39
40impl IsError for RDKafkaError {
41 fn is_error(&self) -> bool {
42 self.0.is_some()
43 }
44}
45
46#[derive(Clone)]
48pub struct RDKafkaError(Option<Arc<NativePtr<rdsys::rd_kafka_error_t>>>);
49
50unsafe impl KafkaDrop for rdsys::rd_kafka_error_t {
51 const TYPE: &'static str = "error";
52 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_error_destroy;
53}
54
55unsafe impl Send for RDKafkaError {}
56unsafe impl Sync for RDKafkaError {}
57
58impl RDKafkaError {
59 pub(crate) unsafe fn from_ptr(ptr: *mut rdsys::rd_kafka_error_t) -> RDKafkaError {
60 RDKafkaError(NativePtr::from_ptr(ptr).map(Arc::new))
61 }
62
63 fn ptr(&self) -> *const rdsys::rd_kafka_error_t {
64 match &self.0 {
65 None => ptr::null(),
66 Some(p) => p.ptr(),
67 }
68 }
69
70 pub fn code(&self) -> RDKafkaErrorCode {
73 unsafe { rdsys::rd_kafka_error_code(self.ptr()).into() }
74 }
75
76 pub fn name(&self) -> String {
79 let cstr = unsafe { rdsys::rd_kafka_error_name(self.ptr()) };
80 unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
81 }
82
83 pub fn string(&self) -> String {
86 let cstr = unsafe { rdsys::rd_kafka_error_string(self.ptr()) };
87 unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
88 }
89
90 pub fn is_fatal(&self) -> bool {
94 unsafe { rdsys::rd_kafka_error_is_fatal(self.ptr()) != 0 }
95 }
96
97 pub fn is_retriable(&self) -> bool {
99 unsafe { rdsys::rd_kafka_error_is_retriable(self.ptr()) != 0 }
100 }
101
102 pub fn txn_requires_abort(&self) -> bool {
104 unsafe { rdsys::rd_kafka_error_txn_requires_abort(self.ptr()) != 0 }
105 }
106}
107
108impl PartialEq for RDKafkaError {
109 fn eq(&self, other: &RDKafkaError) -> bool {
110 self.code() == other.code()
111 }
112}
113
114impl Eq for RDKafkaError {}
115
116impl fmt::Debug for RDKafkaError {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 write!(f, "RDKafkaError({})", self)
119 }
120}
121
122impl fmt::Display for RDKafkaError {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.write_str(&self.string())
125 }
126}
127
128impl Error for RDKafkaError {}
129
130#[derive(Clone, PartialEq, Eq)]
136#[non_exhaustive]
137pub enum KafkaError {
138 AdminOpCreation(String),
140 AdminOp(RDKafkaErrorCode),
142 Canceled,
144 ClientConfig(RDKafkaConfRes, String, String, String),
146 ClientCreation(String),
148 ConsumerCommit(RDKafkaErrorCode),
150 Flush(RDKafkaErrorCode),
152 Global(RDKafkaErrorCode),
154 GroupListFetch(RDKafkaErrorCode),
156 MessageConsumption(RDKafkaErrorCode),
158 MessageProduction(RDKafkaErrorCode),
160 MetadataFetch(RDKafkaErrorCode),
162 NoMessageReceived,
164 Nul(ffi::NulError),
166 OffsetFetch(RDKafkaErrorCode),
168 PartitionEOF(i32),
170 PauseResume(String),
172 Rebalance(RDKafkaErrorCode),
174 Seek(String),
176 SetPartitionOffset(RDKafkaErrorCode),
178 StoreOffset(RDKafkaErrorCode),
180 Subscription(String),
182 Transaction(RDKafkaError),
184 MockCluster(RDKafkaErrorCode),
186}
187
188impl fmt::Debug for KafkaError {
189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190 match self {
191 KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err),
192 KafkaError::AdminOpCreation(ref err) => {
193 write!(f, "KafkaError (Admin operation creation error: {})", err)
194 }
195 KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
196 KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(
197 f,
198 "KafkaError (Client config error: {} {} {})",
199 desc, key, value
200 ),
201 KafkaError::ClientCreation(ref err) => {
202 write!(f, "KafkaError (Client creation error: {})", err)
203 }
204 KafkaError::ConsumerCommit(err) => {
205 write!(f, "KafkaError (Consumer commit error: {})", err)
206 }
207 KafkaError::Flush(err) => write!(f, "KafkaError (Flush error: {})", err),
208 KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
209 KafkaError::GroupListFetch(err) => {
210 write!(f, "KafkaError (Group list fetch error: {})", err)
211 }
212 KafkaError::MessageConsumption(err) => {
213 write!(f, "KafkaError (Message consumption error: {})", err)
214 }
215 KafkaError::MessageProduction(err) => {
216 write!(f, "KafkaError (Message production error: {})", err)
217 }
218 KafkaError::MetadataFetch(err) => {
219 write!(f, "KafkaError (Metadata fetch error: {})", err)
220 }
221 KafkaError::NoMessageReceived => {
222 write!(f, "No message received within the given poll interval")
223 }
224 KafkaError::Nul(_) => write!(f, "FFI null error"),
225 KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
226 KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
227 KafkaError::PauseResume(ref err) => {
228 write!(f, "KafkaError (Pause/resume error: {})", err)
229 }
230 KafkaError::Rebalance(ref err) => write!(f, "KafkaError (Rebalance error: {})", err),
231 KafkaError::Seek(ref err) => write!(f, "KafkaError (Seek error: {})", err),
232 KafkaError::SetPartitionOffset(err) => {
233 write!(f, "KafkaError (Set partition offset error: {})", err)
234 }
235 KafkaError::StoreOffset(err) => write!(f, "KafkaError (Store offset error: {})", err),
236 KafkaError::Subscription(ref err) => {
237 write!(f, "KafkaError (Subscription error: {})", err)
238 }
239 KafkaError::Transaction(err) => write!(f, "KafkaError (Transaction error: {})", err),
240 KafkaError::MockCluster(err) => write!(f, "KafkaError (Mock cluster error: {})", err),
241 }
242 }
243}
244
245impl fmt::Display for KafkaError {
246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247 match self {
248 KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err),
249 KafkaError::AdminOpCreation(ref err) => {
250 write!(f, "Admin operation creation error: {}", err)
251 }
252 KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
253 KafkaError::ClientConfig(_, ref desc, ref key, ref value) => {
254 write!(f, "Client config error: {} {} {}", desc, key, value)
255 }
256 KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err),
257 KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
258 KafkaError::Flush(err) => write!(f, "Flush error: {}", err),
259 KafkaError::Global(err) => write!(f, "Global error: {}", err),
260 KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
261 KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err),
262 KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
263 KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
264 KafkaError::NoMessageReceived => {
265 write!(f, "No message received within the given poll interval")
266 }
267 KafkaError::Nul(_) => write!(f, "FFI nul error"),
268 KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
269 KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
270 KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err),
271 KafkaError::Rebalance(ref err) => write!(f, "Rebalance error: {}", err),
272 KafkaError::Seek(ref err) => write!(f, "Seek error: {}", err),
273 KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
274 KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
275 KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err),
276 KafkaError::Transaction(err) => write!(f, "Transaction error: {}", err),
277 KafkaError::MockCluster(err) => write!(f, "Mock cluster error: {}", err),
278 }
279 }
280}
281
282impl Error for KafkaError {
283 fn source(&self) -> Option<&(dyn Error + 'static)> {
284 match self {
285 KafkaError::AdminOp(_) => None,
286 KafkaError::AdminOpCreation(_) => None,
287 KafkaError::Canceled => None,
288 KafkaError::ClientConfig(..) => None,
289 KafkaError::ClientCreation(_) => None,
290 KafkaError::ConsumerCommit(err) => Some(err),
291 KafkaError::Flush(err) => Some(err),
292 KafkaError::Global(err) => Some(err),
293 KafkaError::GroupListFetch(err) => Some(err),
294 KafkaError::MessageConsumption(err) => Some(err),
295 KafkaError::MessageProduction(err) => Some(err),
296 KafkaError::MetadataFetch(err) => Some(err),
297 KafkaError::NoMessageReceived => None,
298 KafkaError::Nul(_) => None,
299 KafkaError::OffsetFetch(err) => Some(err),
300 KafkaError::PartitionEOF(_) => None,
301 KafkaError::PauseResume(_) => None,
302 KafkaError::Rebalance(err) => Some(err),
303 KafkaError::Seek(_) => None,
304 KafkaError::SetPartitionOffset(err) => Some(err),
305 KafkaError::StoreOffset(err) => Some(err),
306 KafkaError::Subscription(_) => None,
307 KafkaError::Transaction(err) => Some(err),
308 KafkaError::MockCluster(err) => Some(err),
309 }
310 }
311}
312
313impl From<ffi::NulError> for KafkaError {
314 fn from(err: ffi::NulError) -> KafkaError {
315 KafkaError::Nul(err)
316 }
317}
318
319impl KafkaError {
320 #[allow(clippy::match_same_arms)]
322 pub fn rdkafka_error_code(&self) -> Option<RDKafkaErrorCode> {
323 match self {
324 KafkaError::AdminOp(_) => None,
325 KafkaError::AdminOpCreation(_) => None,
326 KafkaError::Canceled => None,
327 KafkaError::ClientConfig(..) => None,
328 KafkaError::ClientCreation(_) => None,
329 KafkaError::ConsumerCommit(err) => Some(*err),
330 KafkaError::Flush(err) => Some(*err),
331 KafkaError::Global(err) => Some(*err),
332 KafkaError::GroupListFetch(err) => Some(*err),
333 KafkaError::MessageConsumption(err) => Some(*err),
334 KafkaError::MessageProduction(err) => Some(*err),
335 KafkaError::MetadataFetch(err) => Some(*err),
336 KafkaError::NoMessageReceived => None,
337 KafkaError::Nul(_) => None,
338 KafkaError::OffsetFetch(err) => Some(*err),
339 KafkaError::PartitionEOF(_) => None,
340 KafkaError::PauseResume(_) => None,
341 KafkaError::Rebalance(err) => Some(*err),
342 KafkaError::Seek(_) => None,
343 KafkaError::SetPartitionOffset(err) => Some(*err),
344 KafkaError::StoreOffset(err) => Some(*err),
345 KafkaError::Subscription(_) => None,
346 KafkaError::Transaction(err) => Some(err.code()),
347 KafkaError::MockCluster(err) => Some(*err),
348 }
349 }
350}