madsim_rdkafka/std/
error.rs

1//! Error manipulations.
2
3use std::error::Error;
4use std::ffi::{self, CStr};
5use std::fmt;
6use std::ptr;
7use std::sync::Arc;
8
9use rdkafka_sys as rdsys;
10use rdkafka_sys::types::*;
11
12use crate::util::{KafkaDrop, NativePtr};
13
14// Re-export rdkafka error code
15pub use rdsys::types::RDKafkaErrorCode;
16
17/// Kafka result.
18pub type KafkaResult<T> = Result<T, KafkaError>;
19
20/// Verify if the value represents an error condition.
21///
22/// Some librdkafka codes are informational, rather than true errors.
23pub trait IsError {
24    /// Reports whether the value represents an error.
25    fn is_error(&self) -> bool;
26}
27
28impl IsError for RDKafkaRespErr {
29    fn is_error(&self) -> bool {
30        *self != RDKafkaRespErr::RD_KAFKA_RESP_ERR_NO_ERROR
31    }
32}
33
34impl IsError for RDKafkaConfRes {
35    fn is_error(&self) -> bool {
36        *self != RDKafkaConfRes::RD_KAFKA_CONF_OK
37    }
38}
39
40impl IsError for RDKafkaError {
41    fn is_error(&self) -> bool {
42        self.0.is_some()
43    }
44}
45
46/// Native rdkafka error.
47#[derive(Clone)]
48pub struct RDKafkaError(Option<Arc<NativePtr<rdsys::rd_kafka_error_t>>>);
49
50unsafe impl KafkaDrop for rdsys::rd_kafka_error_t {
51    const TYPE: &'static str = "error";
52    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_error_destroy;
53}
54
55unsafe impl Send for RDKafkaError {}
56unsafe impl Sync for RDKafkaError {}
57
58impl RDKafkaError {
59    pub(crate) unsafe fn from_ptr(ptr: *mut rdsys::rd_kafka_error_t) -> RDKafkaError {
60        RDKafkaError(NativePtr::from_ptr(ptr).map(Arc::new))
61    }
62
63    fn ptr(&self) -> *const rdsys::rd_kafka_error_t {
64        match &self.0 {
65            None => ptr::null(),
66            Some(p) => p.ptr(),
67        }
68    }
69
70    /// Returns the error code or [`RDKafkaErrorCode::NoError`] if the error is
71    /// null.
72    pub fn code(&self) -> RDKafkaErrorCode {
73        unsafe { rdsys::rd_kafka_error_code(self.ptr()).into() }
74    }
75
76    /// Returns the error code name, e.g., "ERR_UNKNOWN_MEMBER_ID" or an empty
77    /// string if the error is null.
78    pub fn name(&self) -> String {
79        let cstr = unsafe { rdsys::rd_kafka_error_name(self.ptr()) };
80        unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
81    }
82
83    /// Returns a human readable error string or an empty string if the error is
84    /// null.
85    pub fn string(&self) -> String {
86        let cstr = unsafe { rdsys::rd_kafka_error_string(self.ptr()) };
87        unsafe { CStr::from_ptr(cstr).to_string_lossy().into_owned() }
88    }
89
90    /// Reports whether the error is a fatal error.
91    ///
92    /// A fatal error indicates that the client instance is no longer usable.
93    pub fn is_fatal(&self) -> bool {
94        unsafe { rdsys::rd_kafka_error_is_fatal(self.ptr()) != 0 }
95    }
96
97    /// Reports whether the operation that encountered the error can be retried.
98    pub fn is_retriable(&self) -> bool {
99        unsafe { rdsys::rd_kafka_error_is_retriable(self.ptr()) != 0 }
100    }
101
102    /// Reports whether the error is an abortable transaction error.
103    pub fn txn_requires_abort(&self) -> bool {
104        unsafe { rdsys::rd_kafka_error_txn_requires_abort(self.ptr()) != 0 }
105    }
106}
107
108impl PartialEq for RDKafkaError {
109    fn eq(&self, other: &RDKafkaError) -> bool {
110        self.code() == other.code()
111    }
112}
113
114impl Eq for RDKafkaError {}
115
116impl fmt::Debug for RDKafkaError {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        write!(f, "RDKafkaError({})", self)
119    }
120}
121
122impl fmt::Display for RDKafkaError {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        f.write_str(&self.string())
125    }
126}
127
128impl Error for RDKafkaError {}
129
130// TODO: consider using macro
131
132/// Represents all possible Kafka errors.
133///
134/// If applicable, check the underlying [`RDKafkaErrorCode`] to get details.
135#[derive(Clone, PartialEq, Eq)]
136#[non_exhaustive]
137pub enum KafkaError {
138    /// Creation of admin operation failed.
139    AdminOpCreation(String),
140    /// The admin operation itself failed.
141    AdminOp(RDKafkaErrorCode),
142    /// The client was dropped before the operation completed.
143    Canceled,
144    /// Invalid client configuration.
145    ClientConfig(RDKafkaConfRes, String, String, String),
146    /// Client creation failed.
147    ClientCreation(String),
148    /// Consumer commit failed.
149    ConsumerCommit(RDKafkaErrorCode),
150    /// Flushing failed
151    Flush(RDKafkaErrorCode),
152    /// Global error.
153    Global(RDKafkaErrorCode),
154    /// Group list fetch failed.
155    GroupListFetch(RDKafkaErrorCode),
156    /// Message consumption failed.
157    MessageConsumption(RDKafkaErrorCode),
158    /// Message production error.
159    MessageProduction(RDKafkaErrorCode),
160    /// Metadata fetch error.
161    MetadataFetch(RDKafkaErrorCode),
162    /// No message was received.
163    NoMessageReceived,
164    /// Unexpected null pointer
165    Nul(ffi::NulError),
166    /// Offset fetch failed.
167    OffsetFetch(RDKafkaErrorCode),
168    /// End of partition reached.
169    PartitionEOF(i32),
170    /// Pause/Resume failed.
171    PauseResume(String),
172    /// Rebalance failed.
173    Rebalance(RDKafkaErrorCode),
174    /// Seeking a partition failed.
175    Seek(String),
176    /// Setting partition offset failed.
177    SetPartitionOffset(RDKafkaErrorCode),
178    /// Offset store failed.
179    StoreOffset(RDKafkaErrorCode),
180    /// Subscription creation failed.
181    Subscription(String),
182    /// Transaction error.
183    Transaction(RDKafkaError),
184    /// Mock Cluster error
185    MockCluster(RDKafkaErrorCode),
186}
187
188impl fmt::Debug for KafkaError {
189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190        match self {
191            KafkaError::AdminOp(err) => write!(f, "KafkaError (Admin operation error: {})", err),
192            KafkaError::AdminOpCreation(ref err) => {
193                write!(f, "KafkaError (Admin operation creation error: {})", err)
194            }
195            KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
196            KafkaError::ClientConfig(_, ref desc, ref key, ref value) => write!(
197                f,
198                "KafkaError (Client config error: {} {} {})",
199                desc, key, value
200            ),
201            KafkaError::ClientCreation(ref err) => {
202                write!(f, "KafkaError (Client creation error: {})", err)
203            }
204            KafkaError::ConsumerCommit(err) => {
205                write!(f, "KafkaError (Consumer commit error: {})", err)
206            }
207            KafkaError::Flush(err) => write!(f, "KafkaError (Flush error: {})", err),
208            KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
209            KafkaError::GroupListFetch(err) => {
210                write!(f, "KafkaError (Group list fetch error: {})", err)
211            }
212            KafkaError::MessageConsumption(err) => {
213                write!(f, "KafkaError (Message consumption error: {})", err)
214            }
215            KafkaError::MessageProduction(err) => {
216                write!(f, "KafkaError (Message production error: {})", err)
217            }
218            KafkaError::MetadataFetch(err) => {
219                write!(f, "KafkaError (Metadata fetch error: {})", err)
220            }
221            KafkaError::NoMessageReceived => {
222                write!(f, "No message received within the given poll interval")
223            }
224            KafkaError::Nul(_) => write!(f, "FFI null error"),
225            KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
226            KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
227            KafkaError::PauseResume(ref err) => {
228                write!(f, "KafkaError (Pause/resume error: {})", err)
229            }
230            KafkaError::Rebalance(ref err) => write!(f, "KafkaError (Rebalance error: {})", err),
231            KafkaError::Seek(ref err) => write!(f, "KafkaError (Seek error: {})", err),
232            KafkaError::SetPartitionOffset(err) => {
233                write!(f, "KafkaError (Set partition offset error: {})", err)
234            }
235            KafkaError::StoreOffset(err) => write!(f, "KafkaError (Store offset error: {})", err),
236            KafkaError::Subscription(ref err) => {
237                write!(f, "KafkaError (Subscription error: {})", err)
238            }
239            KafkaError::Transaction(err) => write!(f, "KafkaError (Transaction error: {})", err),
240            KafkaError::MockCluster(err) => write!(f, "KafkaError (Mock cluster error: {})", err),
241        }
242    }
243}
244
245impl fmt::Display for KafkaError {
246    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247        match self {
248            KafkaError::AdminOp(err) => write!(f, "Admin operation error: {}", err),
249            KafkaError::AdminOpCreation(ref err) => {
250                write!(f, "Admin operation creation error: {}", err)
251            }
252            KafkaError::Canceled => write!(f, "KafkaError (Client dropped)"),
253            KafkaError::ClientConfig(_, ref desc, ref key, ref value) => {
254                write!(f, "Client config error: {} {} {}", desc, key, value)
255            }
256            KafkaError::ClientCreation(ref err) => write!(f, "Client creation error: {}", err),
257            KafkaError::ConsumerCommit(err) => write!(f, "Consumer commit error: {}", err),
258            KafkaError::Flush(err) => write!(f, "Flush error: {}", err),
259            KafkaError::Global(err) => write!(f, "Global error: {}", err),
260            KafkaError::GroupListFetch(err) => write!(f, "Group list fetch error: {}", err),
261            KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err),
262            KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err),
263            KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err),
264            KafkaError::NoMessageReceived => {
265                write!(f, "No message received within the given poll interval")
266            }
267            KafkaError::Nul(_) => write!(f, "FFI nul error"),
268            KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
269            KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
270            KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err),
271            KafkaError::Rebalance(ref err) => write!(f, "Rebalance error: {}", err),
272            KafkaError::Seek(ref err) => write!(f, "Seek error: {}", err),
273            KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
274            KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
275            KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err),
276            KafkaError::Transaction(err) => write!(f, "Transaction error: {}", err),
277            KafkaError::MockCluster(err) => write!(f, "Mock cluster error: {}", err),
278        }
279    }
280}
281
282impl Error for KafkaError {
283    fn source(&self) -> Option<&(dyn Error + 'static)> {
284        match self {
285            KafkaError::AdminOp(_) => None,
286            KafkaError::AdminOpCreation(_) => None,
287            KafkaError::Canceled => None,
288            KafkaError::ClientConfig(..) => None,
289            KafkaError::ClientCreation(_) => None,
290            KafkaError::ConsumerCommit(err) => Some(err),
291            KafkaError::Flush(err) => Some(err),
292            KafkaError::Global(err) => Some(err),
293            KafkaError::GroupListFetch(err) => Some(err),
294            KafkaError::MessageConsumption(err) => Some(err),
295            KafkaError::MessageProduction(err) => Some(err),
296            KafkaError::MetadataFetch(err) => Some(err),
297            KafkaError::NoMessageReceived => None,
298            KafkaError::Nul(_) => None,
299            KafkaError::OffsetFetch(err) => Some(err),
300            KafkaError::PartitionEOF(_) => None,
301            KafkaError::PauseResume(_) => None,
302            KafkaError::Rebalance(err) => Some(err),
303            KafkaError::Seek(_) => None,
304            KafkaError::SetPartitionOffset(err) => Some(err),
305            KafkaError::StoreOffset(err) => Some(err),
306            KafkaError::Subscription(_) => None,
307            KafkaError::Transaction(err) => Some(err),
308            KafkaError::MockCluster(err) => Some(err),
309        }
310    }
311}
312
313impl From<ffi::NulError> for KafkaError {
314    fn from(err: ffi::NulError) -> KafkaError {
315        KafkaError::Nul(err)
316    }
317}
318
319impl KafkaError {
320    /// Returns the [`RDKafkaErrorCode`] underlying this error, if any.
321    #[allow(clippy::match_same_arms)]
322    pub fn rdkafka_error_code(&self) -> Option<RDKafkaErrorCode> {
323        match self {
324            KafkaError::AdminOp(_) => None,
325            KafkaError::AdminOpCreation(_) => None,
326            KafkaError::Canceled => None,
327            KafkaError::ClientConfig(..) => None,
328            KafkaError::ClientCreation(_) => None,
329            KafkaError::ConsumerCommit(err) => Some(*err),
330            KafkaError::Flush(err) => Some(*err),
331            KafkaError::Global(err) => Some(*err),
332            KafkaError::GroupListFetch(err) => Some(*err),
333            KafkaError::MessageConsumption(err) => Some(*err),
334            KafkaError::MessageProduction(err) => Some(*err),
335            KafkaError::MetadataFetch(err) => Some(*err),
336            KafkaError::NoMessageReceived => None,
337            KafkaError::Nul(_) => None,
338            KafkaError::OffsetFetch(err) => Some(*err),
339            KafkaError::PartitionEOF(_) => None,
340            KafkaError::PauseResume(_) => None,
341            KafkaError::Rebalance(err) => Some(*err),
342            KafkaError::Seek(_) => None,
343            KafkaError::SetPartitionOffset(err) => Some(*err),
344            KafkaError::StoreOffset(err) => Some(*err),
345            KafkaError::Subscription(_) => None,
346            KafkaError::Transaction(err) => Some(err.code()),
347            KafkaError::MockCluster(err) => Some(*err),
348        }
349    }
350}