pub struct MockCluster<'c, C: ClientContext> { /* private fields */ }
Expand description
Mock Kafka cluster with a configurable number of brokers that support a reasonable subset of Kafka protocol operations, error injection, etc.
Mock clusters provide localhost listeners that can be used as the bootstrap servers by multiple Kafka client instances.
Currently supported functionality:
- Producer
- Idempotent Producer
- Transactional Producer
- Low-level consumer
- High-level balanced consumer groups with offset commits
- Topic Metadata and auto creation
The mock cluster can be either created with MockCluster::new()
or by configuring the test.mock.num.brokers
property when creating a producer/consumer.
This will override that producer/consumer’s bootstrap servers setting and internally
create a mock cluster. You can then obtain this mock cluster using Client::mock_cluster()
.
Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL.
Implementations§
Source§impl MockCluster<'static, DefaultProducerContext>
impl MockCluster<'static, DefaultProducerContext>
Sourcepub fn new(broker_count: i32) -> KafkaResult<Self>
pub fn new(broker_count: i32) -> KafkaResult<Self>
Creates a new mock cluster with the given number of brokers
Source§impl<'c, C> MockCluster<'c, C>where
C: ClientContext,
impl<'c, C> MockCluster<'c, C>where
C: ClientContext,
Sourcepub fn bootstrap_servers(&self) -> String
pub fn bootstrap_servers(&self) -> String
Returns the mock cluster’s bootstrap.servers list
Sourcepub fn clear_request_errors(&self, api_key: RDKafkaApiKey)
pub fn clear_request_errors(&self, api_key: RDKafkaApiKey)
Clear the cluster’s error state for the given ApiKey.
Sourcepub fn request_errors(&self, api_key: RDKafkaApiKey, errors: &[RDKafkaRespErr])
pub fn request_errors(&self, api_key: RDKafkaApiKey, errors: &[RDKafkaRespErr])
Push errors onto the cluster’s error stack for the given ApiKey.
The protocol requests matching the given ApiKey will fail with the provided error code and removed from the stack, starting with the first error code, then the second, etc.
Passing RD_KAFKA_RESP_ERR__TRANSPORT will make the mock broker disconnect the client which can be useful to trigger a disconnect on certain requests.
Sourcepub fn topic_error(&self, topic: &str, error: RDKafkaRespErr) -> KafkaResult<()>
pub fn topic_error(&self, topic: &str, error: RDKafkaRespErr) -> KafkaResult<()>
Set the topic error to return in protocol requests.
Currently only used for TopicMetadataRequest and AddPartitionsToTxnRequest.
Sourcepub fn create_topic(
&self,
topic: &str,
partition_count: i32,
replication_factor: i32,
) -> KafkaResult<()>
pub fn create_topic( &self, topic: &str, partition_count: i32, replication_factor: i32, ) -> KafkaResult<()>
Create a topic
This is an alternative to automatic topic creation as performed by the client itself.
NOTE: The Topic Admin API (CreateTopics) is not supported by the mock broker
Sourcepub fn partition_leader(
&self,
topic: &str,
partition: i32,
broker_id: Option<i32>,
) -> KafkaResult<()>
pub fn partition_leader( &self, topic: &str, partition: i32, broker_id: Option<i32>, ) -> KafkaResult<()>
Sets the partition leader
The topic will be created if it does not exist.
broker_id
needs to be an existing broker, or None to make the partition leader-less.
Sourcepub fn partition_follower(
&self,
topic: &str,
partition: i32,
broker_id: i32,
) -> KafkaResult<()>
pub fn partition_follower( &self, topic: &str, partition: i32, broker_id: i32, ) -> KafkaResult<()>
Sets the partition’s preferred replica / follower.
The topic will be created if it does not exist.
broker_id
does not need to point to an existing broker.
Sourcepub fn follower_watermarks(
&self,
topic: &str,
partition: i32,
low_watermark: Option<i64>,
high_watermark: Option<i64>,
) -> KafkaResult<()>
pub fn follower_watermarks( &self, topic: &str, partition: i32, low_watermark: Option<i64>, high_watermark: Option<i64>, ) -> KafkaResult<()>
Sets the partition’s preferred replica / follower low and high watermarks.
The topic will be created if it does not exist.
Setting an offset to None
will revert back to the leader’s corresponding watermark.
Sourcepub fn broker_down(&self, broker_id: i32) -> KafkaResult<()>
pub fn broker_down(&self, broker_id: i32) -> KafkaResult<()>
Disconnects the broker and disallows any new connections. Use -1 for all brokers, or >= 0 for a specific broker.
NOTE: This does NOT trigger leader change.
Sourcepub fn broker_up(&self, broker_id: i32) -> KafkaResult<()>
pub fn broker_up(&self, broker_id: i32) -> KafkaResult<()>
Makes the broker accept connections again. Use -1 for all brokers, or >= 0 for a specific broker.
NOTE: This does NOT trigger leader change.
Sourcepub fn broker_round_trip_time(
&self,
broker_id: i32,
delay: Duration,
) -> KafkaResult<()>
pub fn broker_round_trip_time( &self, broker_id: i32, delay: Duration, ) -> KafkaResult<()>
Set broker round-trip-time delay in milliseconds. Use -1 for all brokers, or >= 0 for a specific broker.
Sourcepub fn broker_rack(&self, broker_id: i32, rack: &str) -> KafkaResult<()>
pub fn broker_rack(&self, broker_id: i32, rack: &str) -> KafkaResult<()>
Sets the broker’s rack as reported in Metadata to the client. Use -1 for all brokers, or >= 0 for a specific broker.
Sourcepub fn coordinator(
&self,
coordinator: MockCoordinator,
broker_id: i32,
) -> KafkaResult<()>
pub fn coordinator( &self, coordinator: MockCoordinator, broker_id: i32, ) -> KafkaResult<()>
Explicitly sets the coordinator.
If this API is not a standard hashing scheme will be used.
broker_id
does not need to point to an existing broker.
Sourcepub fn apiversion(
&self,
api_key: RDKafkaApiKey,
min_version: Option<i16>,
max_version: Option<i16>,
) -> KafkaResult<()>
pub fn apiversion( &self, api_key: RDKafkaApiKey, min_version: Option<i16>, max_version: Option<i16>, ) -> KafkaResult<()>
Set the allowed ApiVersion range for the given ApiKey.
Set min_version and max_version to None
to disable the API completely.
max_version MUST not exceed the maximum implemented value.