use std::time::Duration;
use clap::{App, Arg};
use log::info;
use rdkafka::config::ClientConfig;
use rdkafka::message::OwnedHeaders;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::get_rdkafka_version;
use crate::example_utils::setup_logger;
mod example_utils;
async fn produce(brokers: &str, topic_name: &str) {
let producer: &FutureProducer = &ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
let futures = (0..5)
.map(|i| async move {
let delivery_status = producer
.send(
FutureRecord::to(topic_name)
.payload(&format!("Message {}", i))
.key(&format!("Key {}", i))
.headers(OwnedHeaders::new().add("header_key", "header_value")),
Duration::from_secs(0),
)
.await;
info!("Delivery status for message {} received", i);
delivery_status
})
.collect::<Vec<_>>();
for future in futures {
info!("Future completed. Result: {:?}", future.await);
}
}
#[tokio::main]
async fn main() {
let matches = App::new("producer example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Simple command line producer")
.arg(
Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"),
)
.arg(
Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true),
)
.arg(
Arg::with_name("topic")
.short("t")
.long("topic")
.help("Destination topic")
.takes_value(true)
.required(true),
)
.get_matches();
setup_logger(true, matches.value_of("log-conf"));
let (version_n, version_s) = get_rdkafka_version();
info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
let topic = matches.value_of("topic").unwrap();
let brokers = matches.value_of("brokers").unwrap();
produce(brokers, topic).await;
}