madsim_rdkafka/std/mocking.rs
1//! Mocking functionality
2//!
3//! Provides a mock Kafka cluster with a configurable number of brokers that support a reasonable
4//! subset of Kafka protocol operations, error injection, etc.
5//!
6//! There are two ways to use the mock clusters, the most simple approach is to configure
7//! `test.mock.num.brokers` (to e.g. 3) in an existing application, which will replace the
8//! configured `bootstrap.servers` with the mock cluster brokers.
9//!
10//! This approach is convenient to easily test existing applications.
11//!
12//! The second approach is to explicitly create a mock cluster by using `MockCluster::new`
13
14use std::convert::TryInto;
15use std::ffi::{CStr, CString};
16use std::os::raw::c_int;
17use std::time::Duration;
18
19use rdkafka_sys as rdsys;
20use rdkafka_sys::types::*;
21
22use crate::client::Client;
23use crate::config::ClientConfig;
24use crate::error::{IsError, KafkaError, KafkaResult};
25use crate::producer::DefaultProducerContext;
26use crate::ClientContext;
27
28/// Used internally by `MockCluster` to distinguish whether the mock cluster is owned or borrowed.
29///
30/// The mock cluster can be created in two ways:
31///
32/// - With `rd_kafka_mock_cluster_new()`. In this case the caller of the c-tor is responsible
33/// for destroying the returned mock cluster instance.
34///
35/// - By setting `test.mock.num.brokers` in a configuration of a producer/consumer client.
36/// In this case, the client creates the mock cluster internally and destroys it in its d-tor,
37/// and we only hold a reference to the mock cluster obtained with `rd_kafka_handle_mock_cluster()` (cf. `Client::mock_cluster()`).
38///
39/// In this case, we **must neither** destroy the mock cluster in `MockCluster`'s `drop()`,
40/// **nor** outlive the `Client` from which the reference is obtained, hence the lifetime.
41#[allow(dead_code)]
42enum MockClusterClient<'c, C: ClientContext> {
43 Owned(Client<C>),
44 Borrowed(&'c Client<C>),
45}
46
47/// Mock Kafka cluster with a configurable number of brokers that support a reasonable subset of
48/// Kafka protocol operations, error injection, etc.
49///
50/// Mock clusters provide localhost listeners that can be used as the bootstrap
51/// servers by multiple Kafka client instances.
52///
53/// Currently supported functionality:
54/// - Producer
55/// - Idempotent Producer
56/// - Transactional Producer
57/// - Low-level consumer
58/// - High-level balanced consumer groups with offset commits
59/// - Topic Metadata and auto creation
60///
61/// The mock cluster can be either created with [`MockCluster::new()`]
62/// or by configuring the `test.mock.num.brokers` property when creating a producer/consumer.
63/// This will override that producer/consumer's bootstrap servers setting and internally
64/// create a mock cluster. You can then obtain this mock cluster using [`Client::mock_cluster()`].
65///
66/// Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL.
67///
68/// [`MockCluster::new()`]: MockCluster::new()
69/// [`Client::mock_cluster()`]: crate::client::Client::mock_cluster()
70pub struct MockCluster<'c, C: ClientContext> {
71 mock_cluster: *mut RDKafkaMockCluster,
72 client: MockClusterClient<'c, C>,
73}
74
75/// Utility macro to simplify returns for operations done on the mock API
76macro_rules! return_mock_op {
77 ($op:expr) => {
78 match $op {
79 err if err.is_error() => Err(KafkaError::MockCluster(err.into())),
80 _ => Ok(()),
81 }
82 };
83}
84
85/// Used to denote an explictly configured coordinator
86pub enum MockCoordinator {
87 /// Mock out coordination by a given transaction id
88 Transaction(String),
89 /// Mock out coordination by a given group id
90 Group(String),
91}
92
93impl MockCluster<'static, DefaultProducerContext> {
94 /// Creates a new mock cluster with the given number of brokers
95 pub fn new(broker_count: i32) -> KafkaResult<Self> {
96 let config = ClientConfig::new();
97 let native_config = config.create_native_config()?;
98 let context = DefaultProducerContext {};
99
100 let client = Client::new(
101 &config,
102 native_config,
103 RDKafkaType::RD_KAFKA_PRODUCER,
104 context,
105 )?;
106
107 let mock_cluster =
108 unsafe { rdsys::rd_kafka_mock_cluster_new(client.native_ptr(), broker_count) };
109 if mock_cluster.is_null() {
110 return Err(KafkaError::MockCluster(rdsys::RDKafkaErrorCode::Fail));
111 }
112
113 Ok(MockCluster {
114 mock_cluster,
115 client: MockClusterClient::Owned(client),
116 })
117 }
118}
119
120impl<'c, C> MockCluster<'c, C>
121where
122 C: ClientContext,
123{
124 /// Returns the mock cluster associated with the given client if any
125 pub(crate) fn from_client(client: &'c Client<C>) -> Option<Self> {
126 let mock_cluster = unsafe { rdsys::rd_kafka_handle_mock_cluster(client.native_ptr()) };
127 if mock_cluster.is_null() {
128 return None;
129 }
130
131 Some(MockCluster {
132 mock_cluster,
133 client: MockClusterClient::Borrowed(client),
134 })
135 }
136
137 /// Returns the mock cluster's bootstrap.servers list
138 pub fn bootstrap_servers(&self) -> String {
139 let bootstrap =
140 unsafe { CStr::from_ptr(rdsys::rd_kafka_mock_cluster_bootstraps(self.mock_cluster)) };
141 bootstrap.to_string_lossy().to_string()
142 }
143
144 /// Clear the cluster's error state for the given ApiKey.
145 pub fn clear_request_errors(&self, api_key: RDKafkaApiKey) {
146 unsafe { rdsys::rd_kafka_mock_clear_request_errors(self.mock_cluster, api_key.into()) }
147 }
148
149 /// Push errors onto the cluster's error stack for the given ApiKey.
150 ///
151 /// The protocol requests matching the given ApiKey will fail with the
152 /// provided error code and removed from the stack, starting with
153 /// the first error code, then the second, etc.
154 ///
155 /// Passing RD_KAFKA_RESP_ERR__TRANSPORT will make the mock broker
156 /// disconnect the client which can be useful to trigger a disconnect
157 /// on certain requests.
158 pub fn request_errors(&self, api_key: RDKafkaApiKey, errors: &[RDKafkaRespErr]) {
159 unsafe {
160 rdsys::rd_kafka_mock_push_request_errors_array(
161 self.mock_cluster,
162 api_key.into(),
163 errors.len(),
164 errors.as_ptr(),
165 )
166 }
167 }
168
169 /// Set the topic error to return in protocol requests.
170 ///
171 /// Currently only used for TopicMetadataRequest and AddPartitionsToTxnRequest.
172 pub fn topic_error(&self, topic: &str, error: RDKafkaRespErr) -> KafkaResult<()> {
173 let topic_c = CString::new(topic)?;
174 unsafe { rdsys::rd_kafka_mock_topic_set_error(self.mock_cluster, topic_c.as_ptr(), error) }
175 Ok(())
176 }
177
178 /// Create a topic
179 ///
180 /// This is an alternative to automatic topic creation as performed by the client itself.
181 ///
182 /// NOTE: The Topic Admin API (CreateTopics) is not supported by the mock broker
183 pub fn create_topic(
184 &self,
185 topic: &str,
186 partition_count: i32,
187 replication_factor: i32,
188 ) -> KafkaResult<()> {
189 let topic_c = CString::new(topic)?;
190 return_mock_op! {
191 unsafe {
192 rdsys::rd_kafka_mock_topic_create(
193 self.mock_cluster,
194 topic_c.as_ptr(),
195 partition_count,
196 replication_factor,
197 )
198 }
199 }
200 }
201
202 /// Sets the partition leader
203 ///
204 /// The topic will be created if it does not exist.
205 ///
206 /// `broker_id` needs to be an existing broker, or None to make the partition leader-less.
207 pub fn partition_leader(
208 &self,
209 topic: &str,
210 partition: i32,
211 broker_id: Option<i32>,
212 ) -> KafkaResult<()> {
213 let topic_c = CString::new(topic)?;
214 let broker_id = broker_id.unwrap_or(-1);
215
216 return_mock_op! {
217 unsafe {
218 rdsys::rd_kafka_mock_partition_set_leader(
219 self.mock_cluster,
220 topic_c.as_ptr(),
221 partition,
222 broker_id,
223 )
224 }
225 }
226 }
227
228 /// Sets the partition's preferred replica / follower.
229 ///
230 /// The topic will be created if it does not exist.
231 ///
232 /// `broker_id` does not need to point to an existing broker.
233 pub fn partition_follower(
234 &self,
235 topic: &str,
236 partition: i32,
237 broker_id: i32,
238 ) -> KafkaResult<()> {
239 let topic_c = CString::new(topic)?;
240
241 return_mock_op! {
242 unsafe {
243 rdsys::rd_kafka_mock_partition_set_follower(
244 self.mock_cluster, topic_c.as_ptr(), partition, broker_id)
245 }
246 }
247 }
248
249 /// Sets the partition's preferred replica / follower low and high watermarks.
250 ///
251 /// The topic will be created if it does not exist.
252 ///
253 /// Setting an offset to `None` will revert back to the leader's corresponding watermark.
254 pub fn follower_watermarks(
255 &self,
256 topic: &str,
257 partition: i32,
258 low_watermark: Option<i64>,
259 high_watermark: Option<i64>,
260 ) -> KafkaResult<()> {
261 let topic_c = CString::new(topic)?;
262 let low_watermark = low_watermark.unwrap_or(-1);
263 let high_watermark = high_watermark.unwrap_or(-1);
264
265 return_mock_op! {
266 unsafe {
267 rdsys::rd_kafka_mock_partition_set_follower_wmarks(
268 self.mock_cluster,
269 topic_c.as_ptr(),
270 partition,
271 low_watermark,
272 high_watermark
273 )
274 }
275 }
276 }
277
278 /// Disconnects the broker and disallows any new connections.
279 /// Use -1 for all brokers, or >= 0 for a specific broker.
280 ///
281 /// NOTE: This does NOT trigger leader change.
282 pub fn broker_down(&self, broker_id: i32) -> KafkaResult<()> {
283 return_mock_op! {
284 unsafe {
285 rdsys::rd_kafka_mock_broker_set_down(self.mock_cluster, broker_id)
286 }
287 }
288 }
289
290 /// Makes the broker accept connections again.
291 /// Use -1 for all brokers, or >= 0 for a specific broker.
292 ///
293 /// NOTE: This does NOT trigger leader change.
294 pub fn broker_up(&self, broker_id: i32) -> KafkaResult<()> {
295 return_mock_op! {
296 unsafe {
297 rdsys::rd_kafka_mock_broker_set_up(self.mock_cluster, broker_id)
298 }
299 }
300 }
301
302 /// Set broker round-trip-time delay in milliseconds.
303 /// Use -1 for all brokers, or >= 0 for a specific broker.
304 pub fn broker_round_trip_time(&self, broker_id: i32, delay: Duration) -> KafkaResult<()> {
305 let rtt_ms = delay.as_millis().try_into().unwrap_or(c_int::MAX);
306
307 return_mock_op! {
308 unsafe {
309 rdsys::rd_kafka_mock_broker_set_rtt(
310 self.mock_cluster,
311 broker_id,
312 rtt_ms
313 )
314 }
315 }
316 }
317
318 /// Sets the broker's rack as reported in Metadata to the client.
319 /// Use -1 for all brokers, or >= 0 for a specific broker.
320 pub fn broker_rack(&self, broker_id: i32, rack: &str) -> KafkaResult<()> {
321 let rack_c = CString::new(rack)?;
322 return_mock_op! {
323 unsafe {
324 rdsys::rd_kafka_mock_broker_set_rack(
325 self.mock_cluster,
326 broker_id,
327 rack_c.as_ptr()
328 )
329 }
330 }
331 }
332
333 /// Explicitly sets the coordinator.
334 ///
335 /// If this API is not a standard hashing scheme will be used.
336 ///
337 /// `broker_id` does not need to point to an existing broker.
338 pub fn coordinator(&self, coordinator: MockCoordinator, broker_id: i32) -> KafkaResult<()> {
339 let (kind, key) = match coordinator {
340 MockCoordinator::Transaction(key) => ("transaction", key),
341 MockCoordinator::Group(key) => ("group", key),
342 };
343
344 let kind_c = CString::new(kind)?;
345 let raw_c = CString::new(key)?;
346
347 return_mock_op! {
348 unsafe {
349 rdsys::rd_kafka_mock_coordinator_set(
350 self.mock_cluster,
351 kind_c.as_ptr(),
352 raw_c.as_ptr(),
353 broker_id
354 )
355 }
356 }
357 }
358
359 /// Set the allowed ApiVersion range for the given ApiKey.
360 ///
361 /// Set min_version and max_version to `None` to disable the API completely.
362 /// max_version MUST not exceed the maximum implemented value.
363 pub fn apiversion(
364 &self,
365 api_key: RDKafkaApiKey,
366 min_version: Option<i16>,
367 max_version: Option<i16>,
368 ) -> KafkaResult<()> {
369 let min_version = min_version.unwrap_or(-1);
370 let max_version = max_version.unwrap_or(-1);
371
372 return_mock_op! {
373 unsafe {
374 rdsys::rd_kafka_mock_set_apiversion(
375 self.mock_cluster,
376 api_key.into(),
377 min_version,
378 max_version,
379 )
380 }
381 }
382 }
383}
384
385impl<'c, C> Drop for MockCluster<'c, C>
386where
387 C: ClientContext,
388{
389 fn drop(&mut self) {
390 if let MockClusterClient::Owned(..) = self.client {
391 unsafe {
392 rdsys::rd_kafka_mock_cluster_destroy(self.mock_cluster);
393 }
394 }
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use crate::consumer::{Consumer, StreamConsumer};
401 use crate::message::ToBytes;
402 use crate::producer::{FutureProducer, FutureRecord};
403 use crate::Message;
404 use tokio;
405
406 use super::*;
407
408 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
409 async fn test_mockcluster() {
410 const TOPIC: &str = "test_topic";
411 let mock_cluster = MockCluster::new(2).unwrap();
412
413 let bootstrap_servers = mock_cluster.bootstrap_servers();
414
415 let producer: FutureProducer = ClientConfig::new()
416 .set("bootstrap.servers", &bootstrap_servers)
417 .create()
418 .await
419 .expect("Producer creation error");
420
421 let consumer: StreamConsumer = ClientConfig::new()
422 .set("bootstrap.servers", &bootstrap_servers)
423 .set("group.id", "rust-rdkafka-mockcluster-test")
424 .set("auto.offset.reset", "earliest")
425 .create()
426 .await
427 .expect("Client creation error");
428
429 let rec = FutureRecord::to(TOPIC).key("msg1").payload("test");
430 producer.send_result(rec).unwrap().await.unwrap().unwrap();
431
432 consumer.subscribe(&[TOPIC]).unwrap();
433
434 let msg = consumer.recv().await.unwrap();
435 assert_eq!(msg.key(), Some("msg1".to_bytes()));
436 assert_eq!(msg.payload(), Some("test".to_bytes()));
437 }
438}