madsim_rdkafka/std/
mocking.rs

1//! Mocking functionality
2//!
3//! Provides a mock Kafka cluster with a configurable number of brokers that support a reasonable
4//! subset of Kafka protocol operations, error injection, etc.
5//!
6//! There are two ways to use the mock clusters, the most simple approach is to configure
7//! `test.mock.num.brokers` (to e.g. 3) in an existing application, which will replace the
8//! configured `bootstrap.servers` with the mock cluster brokers.
9//!
10//! This approach is convenient to easily test existing applications.
11//!
12//! The second approach is to explicitly create a mock cluster by using `MockCluster::new`
13
14use std::convert::TryInto;
15use std::ffi::{CStr, CString};
16use std::os::raw::c_int;
17use std::time::Duration;
18
19use rdkafka_sys as rdsys;
20use rdkafka_sys::types::*;
21
22use crate::client::Client;
23use crate::config::ClientConfig;
24use crate::error::{IsError, KafkaError, KafkaResult};
25use crate::producer::DefaultProducerContext;
26use crate::ClientContext;
27
28/// Used internally by `MockCluster` to distinguish whether the mock cluster is owned or borrowed.
29///
30/// The mock cluster can be created in two ways:
31///
32/// - With `rd_kafka_mock_cluster_new()`. In this case the caller of the c-tor is responsible
33///   for destroying the returned mock cluster instance.
34///
35/// - By setting `test.mock.num.brokers` in a configuration of a producer/consumer client.
36///   In this case, the client creates the mock cluster internally and destroys it in its d-tor,
37///   and we only hold a reference to the mock cluster obtained with `rd_kafka_handle_mock_cluster()` (cf. `Client::mock_cluster()`).
38///
39///   In this case, we **must neither** destroy the mock cluster in `MockCluster`'s `drop()`,
40///   **nor** outlive the `Client` from which the reference is obtained, hence the lifetime.
41#[allow(dead_code)]
42enum MockClusterClient<'c, C: ClientContext> {
43    Owned(Client<C>),
44    Borrowed(&'c Client<C>),
45}
46
47/// Mock Kafka cluster with a configurable number of brokers that support a reasonable subset of
48/// Kafka protocol operations, error injection, etc.
49///
50/// Mock clusters provide localhost listeners that can be used as the bootstrap
51/// servers by multiple Kafka client instances.
52///
53/// Currently supported functionality:
54/// - Producer
55/// - Idempotent Producer
56/// - Transactional Producer
57/// - Low-level consumer
58/// - High-level balanced consumer groups with offset commits
59/// - Topic Metadata and auto creation
60///
61/// The mock cluster can be either created with [`MockCluster::new()`]
62/// or by configuring the `test.mock.num.brokers` property when creating a producer/consumer.
63/// This will override that producer/consumer's bootstrap servers setting and internally
64/// create a mock cluster. You can then obtain this mock cluster using [`Client::mock_cluster()`].
65///
66/// Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL.
67///
68/// [`MockCluster::new()`]: MockCluster::new()
69/// [`Client::mock_cluster()`]: crate::client::Client::mock_cluster()
70pub struct MockCluster<'c, C: ClientContext> {
71    mock_cluster: *mut RDKafkaMockCluster,
72    client: MockClusterClient<'c, C>,
73}
74
75/// Utility macro to simplify returns for operations done on the mock API
76macro_rules! return_mock_op {
77    ($op:expr) => {
78        match $op {
79            err if err.is_error() => Err(KafkaError::MockCluster(err.into())),
80            _ => Ok(()),
81        }
82    };
83}
84
85/// Used to denote an explictly configured coordinator
86pub enum MockCoordinator {
87    /// Mock out coordination by a given transaction id
88    Transaction(String),
89    /// Mock out coordination by a given group id
90    Group(String),
91}
92
93impl MockCluster<'static, DefaultProducerContext> {
94    /// Creates a new mock cluster with the given number of brokers
95    pub fn new(broker_count: i32) -> KafkaResult<Self> {
96        let config = ClientConfig::new();
97        let native_config = config.create_native_config()?;
98        let context = DefaultProducerContext {};
99
100        let client = Client::new(
101            &config,
102            native_config,
103            RDKafkaType::RD_KAFKA_PRODUCER,
104            context,
105        )?;
106
107        let mock_cluster =
108            unsafe { rdsys::rd_kafka_mock_cluster_new(client.native_ptr(), broker_count) };
109        if mock_cluster.is_null() {
110            return Err(KafkaError::MockCluster(rdsys::RDKafkaErrorCode::Fail));
111        }
112
113        Ok(MockCluster {
114            mock_cluster,
115            client: MockClusterClient::Owned(client),
116        })
117    }
118}
119
120impl<'c, C> MockCluster<'c, C>
121where
122    C: ClientContext,
123{
124    /// Returns the mock cluster associated with the given client if any
125    pub(crate) fn from_client(client: &'c Client<C>) -> Option<Self> {
126        let mock_cluster = unsafe { rdsys::rd_kafka_handle_mock_cluster(client.native_ptr()) };
127        if mock_cluster.is_null() {
128            return None;
129        }
130
131        Some(MockCluster {
132            mock_cluster,
133            client: MockClusterClient::Borrowed(client),
134        })
135    }
136
137    /// Returns the mock cluster's bootstrap.servers list
138    pub fn bootstrap_servers(&self) -> String {
139        let bootstrap =
140            unsafe { CStr::from_ptr(rdsys::rd_kafka_mock_cluster_bootstraps(self.mock_cluster)) };
141        bootstrap.to_string_lossy().to_string()
142    }
143
144    /// Clear the cluster's error state for the given ApiKey.
145    pub fn clear_request_errors(&self, api_key: RDKafkaApiKey) {
146        unsafe { rdsys::rd_kafka_mock_clear_request_errors(self.mock_cluster, api_key.into()) }
147    }
148
149    /// Push errors onto the cluster's error stack for the given ApiKey.
150    ///
151    /// The protocol requests matching the given ApiKey will fail with the
152    /// provided error code and removed from the stack, starting with
153    /// the first error code, then the second, etc.
154    ///
155    /// Passing RD_KAFKA_RESP_ERR__TRANSPORT will make the mock broker
156    /// disconnect the client which can be useful to trigger a disconnect
157    /// on certain requests.
158    pub fn request_errors(&self, api_key: RDKafkaApiKey, errors: &[RDKafkaRespErr]) {
159        unsafe {
160            rdsys::rd_kafka_mock_push_request_errors_array(
161                self.mock_cluster,
162                api_key.into(),
163                errors.len(),
164                errors.as_ptr(),
165            )
166        }
167    }
168
169    /// Set the topic error to return in protocol requests.
170    ///
171    /// Currently only used for TopicMetadataRequest and AddPartitionsToTxnRequest.
172    pub fn topic_error(&self, topic: &str, error: RDKafkaRespErr) -> KafkaResult<()> {
173        let topic_c = CString::new(topic)?;
174        unsafe { rdsys::rd_kafka_mock_topic_set_error(self.mock_cluster, topic_c.as_ptr(), error) }
175        Ok(())
176    }
177
178    /// Create a topic
179    ///
180    /// This is an alternative to automatic topic creation as performed by the client itself.
181    ///
182    /// NOTE: The Topic Admin API (CreateTopics) is not supported by the mock broker
183    pub fn create_topic(
184        &self,
185        topic: &str,
186        partition_count: i32,
187        replication_factor: i32,
188    ) -> KafkaResult<()> {
189        let topic_c = CString::new(topic)?;
190        return_mock_op! {
191            unsafe {
192                rdsys::rd_kafka_mock_topic_create(
193                    self.mock_cluster,
194                    topic_c.as_ptr(),
195                    partition_count,
196                    replication_factor,
197                )
198            }
199        }
200    }
201
202    /// Sets the partition leader
203    ///
204    /// The topic will be created if it does not exist.
205    ///
206    /// `broker_id` needs to be an existing broker, or None to make the partition leader-less.
207    pub fn partition_leader(
208        &self,
209        topic: &str,
210        partition: i32,
211        broker_id: Option<i32>,
212    ) -> KafkaResult<()> {
213        let topic_c = CString::new(topic)?;
214        let broker_id = broker_id.unwrap_or(-1);
215
216        return_mock_op! {
217            unsafe {
218                rdsys::rd_kafka_mock_partition_set_leader(
219                    self.mock_cluster,
220                    topic_c.as_ptr(),
221                    partition,
222                    broker_id,
223                )
224            }
225        }
226    }
227
228    /// Sets the partition's preferred replica / follower.
229    ///
230    /// The topic will be created if it does not exist.
231    ///
232    /// `broker_id` does not need to point to an existing broker.
233    pub fn partition_follower(
234        &self,
235        topic: &str,
236        partition: i32,
237        broker_id: i32,
238    ) -> KafkaResult<()> {
239        let topic_c = CString::new(topic)?;
240
241        return_mock_op! {
242            unsafe {
243                rdsys::rd_kafka_mock_partition_set_follower(
244                    self.mock_cluster, topic_c.as_ptr(), partition, broker_id)
245            }
246        }
247    }
248
249    /// Sets the partition's preferred replica / follower low and high watermarks.
250    ///
251    /// The topic will be created if it does not exist.
252    ///
253    /// Setting an offset to `None` will revert back to the leader's corresponding watermark.
254    pub fn follower_watermarks(
255        &self,
256        topic: &str,
257        partition: i32,
258        low_watermark: Option<i64>,
259        high_watermark: Option<i64>,
260    ) -> KafkaResult<()> {
261        let topic_c = CString::new(topic)?;
262        let low_watermark = low_watermark.unwrap_or(-1);
263        let high_watermark = high_watermark.unwrap_or(-1);
264
265        return_mock_op! {
266            unsafe {
267                rdsys::rd_kafka_mock_partition_set_follower_wmarks(
268                    self.mock_cluster,
269                    topic_c.as_ptr(),
270                    partition,
271                    low_watermark,
272                    high_watermark
273                )
274            }
275        }
276    }
277
278    /// Disconnects the broker and disallows any new connections.
279    /// Use -1 for all brokers, or >= 0 for a specific broker.
280    ///
281    /// NOTE: This does NOT trigger leader change.
282    pub fn broker_down(&self, broker_id: i32) -> KafkaResult<()> {
283        return_mock_op! {
284            unsafe {
285                rdsys::rd_kafka_mock_broker_set_down(self.mock_cluster, broker_id)
286            }
287        }
288    }
289
290    /// Makes the broker accept connections again.
291    /// Use -1 for all brokers, or >= 0 for a specific broker.
292    ///
293    /// NOTE: This does NOT trigger leader change.
294    pub fn broker_up(&self, broker_id: i32) -> KafkaResult<()> {
295        return_mock_op! {
296            unsafe {
297                rdsys::rd_kafka_mock_broker_set_up(self.mock_cluster, broker_id)
298            }
299        }
300    }
301
302    /// Set broker round-trip-time delay in milliseconds.
303    /// Use -1 for all brokers, or >= 0 for a specific broker.
304    pub fn broker_round_trip_time(&self, broker_id: i32, delay: Duration) -> KafkaResult<()> {
305        let rtt_ms = delay.as_millis().try_into().unwrap_or(c_int::MAX);
306
307        return_mock_op! {
308            unsafe {
309                rdsys::rd_kafka_mock_broker_set_rtt(
310                    self.mock_cluster,
311                    broker_id,
312                    rtt_ms
313                )
314            }
315        }
316    }
317
318    /// Sets the broker's rack as reported in Metadata to the client.
319    /// Use -1 for all brokers, or >= 0 for a specific broker.
320    pub fn broker_rack(&self, broker_id: i32, rack: &str) -> KafkaResult<()> {
321        let rack_c = CString::new(rack)?;
322        return_mock_op! {
323            unsafe {
324                rdsys::rd_kafka_mock_broker_set_rack(
325                    self.mock_cluster,
326                    broker_id,
327                    rack_c.as_ptr()
328                )
329            }
330        }
331    }
332
333    /// Explicitly sets the coordinator.
334    ///
335    /// If this API is not a standard hashing scheme will be used.
336    ///
337    /// `broker_id` does not need to point to an existing broker.
338    pub fn coordinator(&self, coordinator: MockCoordinator, broker_id: i32) -> KafkaResult<()> {
339        let (kind, key) = match coordinator {
340            MockCoordinator::Transaction(key) => ("transaction", key),
341            MockCoordinator::Group(key) => ("group", key),
342        };
343
344        let kind_c = CString::new(kind)?;
345        let raw_c = CString::new(key)?;
346
347        return_mock_op! {
348            unsafe {
349                rdsys::rd_kafka_mock_coordinator_set(
350                    self.mock_cluster,
351                    kind_c.as_ptr(),
352                    raw_c.as_ptr(),
353                    broker_id
354                )
355            }
356        }
357    }
358
359    /// Set the allowed ApiVersion range for the given ApiKey.
360    ///
361    /// Set min_version and max_version to `None` to disable the API completely.
362    /// max_version MUST not exceed the maximum implemented value.
363    pub fn apiversion(
364        &self,
365        api_key: RDKafkaApiKey,
366        min_version: Option<i16>,
367        max_version: Option<i16>,
368    ) -> KafkaResult<()> {
369        let min_version = min_version.unwrap_or(-1);
370        let max_version = max_version.unwrap_or(-1);
371
372        return_mock_op! {
373            unsafe {
374                rdsys::rd_kafka_mock_set_apiversion(
375                    self.mock_cluster,
376                    api_key.into(),
377                    min_version,
378                    max_version,
379                )
380            }
381        }
382    }
383}
384
385impl<'c, C> Drop for MockCluster<'c, C>
386where
387    C: ClientContext,
388{
389    fn drop(&mut self) {
390        if let MockClusterClient::Owned(..) = self.client {
391            unsafe {
392                rdsys::rd_kafka_mock_cluster_destroy(self.mock_cluster);
393            }
394        }
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use crate::consumer::{Consumer, StreamConsumer};
401    use crate::message::ToBytes;
402    use crate::producer::{FutureProducer, FutureRecord};
403    use crate::Message;
404    use tokio;
405
406    use super::*;
407
408    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
409    async fn test_mockcluster() {
410        const TOPIC: &str = "test_topic";
411        let mock_cluster = MockCluster::new(2).unwrap();
412
413        let bootstrap_servers = mock_cluster.bootstrap_servers();
414
415        let producer: FutureProducer = ClientConfig::new()
416            .set("bootstrap.servers", &bootstrap_servers)
417            .create()
418            .await
419            .expect("Producer creation error");
420
421        let consumer: StreamConsumer = ClientConfig::new()
422            .set("bootstrap.servers", &bootstrap_servers)
423            .set("group.id", "rust-rdkafka-mockcluster-test")
424            .set("auto.offset.reset", "earliest")
425            .create()
426            .await
427            .expect("Client creation error");
428
429        let rec = FutureRecord::to(TOPIC).key("msg1").payload("test");
430        producer.send_result(rec).unwrap().await.unwrap().unwrap();
431
432        consumer.subscribe(&[TOPIC]).unwrap();
433
434        let msg = consumer.recv().await.unwrap();
435        assert_eq!(msg.key(), Some("msg1".to_bytes()));
436        assert_eq!(msg.payload(), Some("test".to_bytes()));
437    }
438}