madsim_rdkafka/std/statistics.rs
1//! Client and broker statistics.
2//!
3//! These statistics are collected automatically by librdkafka when the client
4//! is configured with a non-zero `statistics.interval.ms`. They are made
5//! available via the [`ClientContext::stats`] callback.
6//!
7//! Refer to the [librdkafka statistics documentation][librdkafka-stats] for
8//! details.
9//!
10//! [`ClientContext::stats`]: crate::ClientContext::stats
11//! [librdkafka-stats]: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
12
13use std::collections::HashMap;
14
15use serde::Deserialize;
16
17/// Overall statistics.
18#[derive(Deserialize, Debug, Default, Clone)]
19pub struct Statistics {
20 /// The name of the librdkafka handle.
21 pub name: String,
22 /// The configured `client.id`.
23 pub client_id: String,
24 /// The instance type (producer or consumer).
25 #[serde(rename = "type")]
26 pub client_type: String,
27 /// The current value of librdkafka's internal monotonic clock, in
28 // microseconds since start.
29 pub ts: i64,
30 /// Wall clock time, in seconds since the Unix epoch.
31 pub time: i64,
32 /// Time since this client instance was created, in microseconds.
33 pub age: i64,
34 /// The number of operations (callbacks, events, etc.) waiting in queue.
35 pub replyq: i64,
36 /// The current number of messages in producer queues.
37 pub msg_cnt: u64,
38 /// The current total size of messages in producer queues.
39 pub msg_size: u64,
40 /// The maximum number of messages allowed in the producer queues.
41 pub msg_max: u64,
42 /// The maximum total size of messages allowed in the producer queues.
43 pub msg_size_max: u64,
44 /// The total number of requests sent to brokers.
45 pub tx: i64,
46 /// The total number of bytes transmitted to brokers.
47 pub tx_bytes: i64,
48 /// The total number of responses received from brokers.
49 pub rx: i64,
50 /// The total number of bytes received from brokers.
51 pub rx_bytes: i64,
52 /// The total number of messages transmitted (produced) to brokers.
53 pub txmsgs: i64,
54 /// The total number of bytes transmitted (produced) to brokers.
55 pub txmsg_bytes: i64,
56 /// The total number of messages consumed from brokers, not including
57 /// ignored messages.
58 pub rxmsgs: i64,
59 /// The total number of bytes (including framing) consumed from brokers.
60 pub rxmsg_bytes: i64,
61 /// Internal tracking of legacy vs. new consumer API state.
62 pub simple_cnt: i64,
63 /// Number of topics in the metadata cache.
64 pub metadata_cache_cnt: i64,
65 /// Per-broker statistics.
66 pub brokers: HashMap<String, Broker>,
67 /// Per-topic statistics.
68 pub topics: HashMap<String, Topic>,
69 /// Consumer group statistics.
70 pub cgrp: Option<ConsumerGroup>,
71 /// Exactly-once semantics and idempotent producer statistics.
72 pub eos: Option<ExactlyOnceSemantics>,
73}
74
75/// Per-broker statistics.
76#[derive(Deserialize, Debug, Default, Clone)]
77pub struct Broker {
78 /// The broker hostname, port, and ID, in the form `HOSTNAME:PORT/ID`.
79 pub name: String,
80 /// The broker ID (-1 for bootstraps).
81 pub nodeid: i32,
82 /// The broker hostname and port.
83 pub nodename: String,
84 /// The broker source (learned, configured, internal, or logical).
85 pub source: String,
86 /// The broker state (INIT, DOWN, CONNECT, AUTH, APIVERSION_QUERY,
87 /// AUTH_HANDSHAKE, UP, UPDATE).
88 pub state: String,
89 /// The time since the last broker state change, in microseconds.
90 pub stateage: i64,
91 /// The number of requests awaiting transmission to the broker.
92 pub outbuf_cnt: i64,
93 /// The number of messages awaiting transmission to the broker.
94 pub outbuf_msg_cnt: i64,
95 /// The number of requests in-flight to the broker that are awaiting a
96 /// response.
97 pub waitresp_cnt: i64,
98 /// The number of messages in-flight to the broker that are awaiting a
99 /// response.
100 pub waitresp_msg_cnt: i64,
101 /// The total number of requests sent to the broker.
102 pub tx: u64,
103 /// The total number of bytes sent to the broker.
104 pub txbytes: u64,
105 /// The total number of transmission errors.
106 pub txerrs: u64,
107 /// The total number of request retries.
108 pub txretries: u64,
109 /// Microseconds since last socket send, or -1 if no sends yet for the
110 /// current connection.
111 pub txidle: i64,
112 /// The total number of requests that timed out.
113 pub req_timeouts: u64,
114 /// The total number of responses received from the broker.
115 pub rx: u64,
116 /// The total number of bytes received from the broker.
117 pub rxbytes: u64,
118 /// The total number of receive errors.
119 pub rxerrs: u64,
120 /// The number of unmatched correlation IDs in response, typically for
121 /// timed out requests.
122 pub rxcorriderrs: u64,
123 /// The total number of partial message sets received. The broker may return
124 /// partial responses if the full message set could not fit in the remaining
125 /// fetch response size.
126 pub rxpartial: u64,
127 /// Microseconds since last socket receive, or -1 if no receives yet for the
128 /// current connection.
129 pub rxidle: i64,
130 /// Request type counters. The object key is the name of the request type
131 /// and the value is the number of requests of that type that have been
132 /// sent.
133 pub req: HashMap<String, i64>,
134 /// The total number of decompression buffer size increases.
135 pub zbuf_grow: u64,
136 /// The total number of buffer size increases (deprecated and unused).
137 pub buf_grow: u64,
138 /// The number of broker thread poll wakeups.
139 pub wakeups: Option<u64>,
140 /// The number of connection attempts, including successful and failed
141 /// attempts, and name resolution failures.
142 pub connects: Option<i64>,
143 /// The number of disconnections, whether triggered by the broker, the
144 /// network, the load balancer, or something else.
145 pub disconnects: Option<i64>,
146 /// Rolling window statistics for the internal producer queue latency, in
147 /// microseconds.
148 pub int_latency: Option<Window>,
149 /// Rolling window statistics for the internal request queue latency, in
150 /// microseconds.
151 ///
152 /// This is the time between when a request is enqueued on the transmit
153 /// (outbuf) queue and the time the request is written to the TCP socket.
154 /// Additional buffering and latency may be incurred by the TCP stack and
155 /// network.
156 pub outbuf_latency: Option<Window>,
157 /// Rolling window statistics for the broker latency/round-trip time,
158 /// in microseconds.
159 pub rtt: Option<Window>,
160 /// Rolling window statistics for the broker throttling time, in
161 /// milliseconds.
162 pub throttle: Option<Window>,
163 /// The partitions that are handled by this broker handle.
164 pub toppars: HashMap<String, TopicPartition>,
165}
166
167/// Rolling window statistics.
168///
169/// These values are not exact; they are sampled estimates maintained by an
170/// HDR histogram in librdkafka.
171#[derive(Deserialize, Debug, Default, Clone)]
172pub struct Window {
173 /// The smallest value.
174 pub min: i64,
175 /// The largest value.
176 pub max: i64,
177 /// The mean value.
178 pub avg: i64,
179 /// The sum of all values.
180 pub sum: i64,
181 /// The total number of values.
182 pub cnt: i64,
183 /// The standard deviation.
184 pub stddev: i64,
185 /// The memory size of the underlying HDR histogram.
186 pub hdrsize: i64,
187 /// The 50th percentile.
188 pub p50: i64,
189 /// The 75th percentile.
190 pub p75: i64,
191 /// The 90th percentile.
192 pub p90: i64,
193 /// The 95th percentile.
194 pub p95: i64,
195 /// The 99th percentile.
196 pub p99: i64,
197 /// The 99.99th percentile.
198 pub p99_99: i64,
199 /// The number of values not included in the underlying histogram because
200 /// they were out of range.
201 pub outofrange: i64,
202}
203
204/// A topic and partition specifier.
205#[derive(Deserialize, Debug, Default, Clone)]
206pub struct TopicPartition {
207 /// The name of the topic.
208 pub topic: String,
209 /// The ID of the partition.
210 pub partition: i32,
211}
212
213/// Per-topic statistics.
214#[derive(Deserialize, Debug, Default, Clone)]
215pub struct Topic {
216 /// The name of the topic.
217 pub topic: String,
218 /// The age of the client's metadata for this topic, in milliseconds.
219 pub metadata_age: i64,
220 /// Rolling window statistics for batch sizes, in bytes.
221 pub batchsize: Window,
222 /// Rolling window statistics for batch message counts.
223 pub batchcnt: Window,
224 /// Per-partition statistics.
225 pub partitions: HashMap<i32, Partition>,
226}
227
228/// Per-partition statistics.
229#[derive(Deserialize, Debug, Default, Clone)]
230pub struct Partition {
231 /// The partition ID.
232 pub partition: i32,
233 /// The ID of the broker from which messages are currently being fetched.
234 pub broker: i32,
235 /// The broker ID of the leader.
236 pub leader: i32,
237 /// Whether the partition is explicitly desired by the application.
238 pub desired: bool,
239 /// Whether the partition is not seen in the topic metadata from the broker.
240 pub unknown: bool,
241 /// The number of messages waiting to be produced in the first-level queue.
242 pub msgq_cnt: i64,
243 /// The number of bytes waiting to be produced in the first-level queue.
244 pub msgq_bytes: u64,
245 /// The number of messages ready to be produced in the transmit queue.
246 pub xmit_msgq_cnt: i64,
247 /// The number of bytes ready to be produced in the transmit queue.
248 pub xmit_msgq_bytes: u64,
249 /// The number of prefetched messages in the fetch queue.
250 pub fetchq_cnt: i64,
251 /// The number of bytes in the fetch queue.
252 pub fetchq_size: u64,
253 /// The consumer fetch state for this partition (none, stopping, stopped,
254 /// offset-query, offset-wait, active).
255 pub fetch_state: String,
256 /// The current/last logical offset query.
257 pub query_offset: i64,
258 /// The next offset to fetch.
259 pub next_offset: i64,
260 /// The offset of the last message passed to the application, plus one.
261 pub app_offset: i64,
262 /// The offset to be committed.
263 pub stored_offset: i64,
264 /// The last committed offset.
265 pub committed_offset: i64,
266 /// The last offset for which partition EOF was signaled.
267 pub eof_offset: i64,
268 /// The low watermark offset on the broker.
269 pub lo_offset: i64,
270 /// The high watermark offset on the broker.
271 pub hi_offset: i64,
272 /// The last stable offset on the broker.
273 pub ls_offset: i64,
274 /// The difference between `hi_offset` and `committed_offset`.
275 pub consumer_lag: i64,
276 /// The difference between `hi_offset` and `stored_offset`.
277 pub consumer_lag_stored: i64,
278 /// The total number of messages transmitted (produced).
279 pub txmsgs: u64,
280 /// The total number of bytes transmitted (produced).
281 pub txbytes: u64,
282 /// The total number of messages consumed, not included ignored messages.
283 pub rxmsgs: u64,
284 /// The total bytes consumed.
285 pub rxbytes: u64,
286 /// The total number of messages received, for consumers, or the total
287 /// number of messages produced, for producers.
288 pub msgs: u64,
289 /// The number of dropped outdated messages.
290 pub rx_ver_drops: u64,
291 /// The current number of messages in flight to or from the broker.
292 pub msgs_inflight: i64,
293 /// The next expected acked sequence number, for idempotent producers.
294 pub next_ack_seq: i64,
295 /// The next expected errored sequence number, for idempotent producers.
296 pub next_err_seq: i64,
297 /// The last acked internal message ID, for idempotent producers.
298 pub acked_msgid: u64,
299}
300
301/// Consumer group manager statistics.
302#[derive(Deserialize, Debug, Default, Clone)]
303pub struct ConsumerGroup {
304 /// The local consumer group handler's state.
305 pub state: String,
306 /// The time elapsed since the last state change, in milliseconds.
307 pub stateage: i64,
308 /// The local consumer group handler's join state.
309 pub join_state: String,
310 /// The time elapsed since the last rebalance (assign or revoke), in
311 /// milliseconds.
312 pub rebalance_age: i64,
313 /// The total number of rebalances (assign or revoke).
314 pub rebalance_cnt: i64,
315 /// The reason for the last rebalance.
316 ///
317 /// This string will be empty if no rebalances have occurred.
318 pub rebalance_reason: String,
319 /// The partition count for the current assignment.
320 pub assignment_size: i32,
321}
322
323/// Exactly-once semantics statistics.
324#[derive(Deserialize, Debug, Default, Clone)]
325pub struct ExactlyOnceSemantics {
326 /// The current idempotent producer state.
327 pub idemp_state: String,
328 /// THe time elapsed since the last idempotent producer state change, in
329 /// milliseconds.
330 pub idemp_stateage: i64,
331 /// The current transactional producer state.
332 pub txn_state: String,
333 /// The time elapsed since the last transactional producer state change, in
334 /// milliseconds.
335 pub txn_stateage: i64,
336 /// Whether the transactional state allows enqueuing (producing) new
337 /// messages.
338 pub txn_may_enq: bool,
339 /// The currently assigned producer ID, or -1.
340 pub producer_id: i64,
341 /// The current epoch, or -1.
342 pub producer_epoch: i64,
343 /// The number of producer ID assignments.
344 pub epoch_cnt: i64,
345}
346
347#[cfg(test)]
348mod tests {
349 use maplit::hashmap;
350
351 use super::*;
352
353 #[test]
354 fn test_statistics() {
355 let stats: Statistics = serde_json::from_str(EXAMPLE).unwrap();
356
357 assert_eq!(stats.name, "rdkafka#producer-1");
358 assert_eq!(stats.client_type, "producer");
359 assert_eq!(stats.ts, 1163982743268);
360 assert_eq!(stats.time, 1589652530);
361 assert_eq!(stats.replyq, 0);
362 assert_eq!(stats.msg_cnt, 320);
363 assert_eq!(stats.msg_size, 9920);
364 assert_eq!(stats.msg_max, 500000);
365 assert_eq!(stats.msg_size_max, 1073741824);
366 assert_eq!(stats.simple_cnt, 0);
367
368 assert_eq!(stats.brokers.len(), 1);
369
370 let broker = stats.brokers.values().collect::<Vec<_>>()[0];
371
372 assert_eq!(
373 broker.req,
374 hashmap! {
375 "Produce".to_string() => 31307,
376 "Offset".to_string() => 0,
377 "Metadata".to_string() => 2,
378 "FindCoordinator".to_string() => 0,
379 "SaslHandshake".to_string() => 0,
380 "ApiVersion".to_string() => 2,
381 "InitProducerId".to_string() => 0,
382 "AddPartitionsToTxn".to_string() => 0,
383 "AddOffsetsToTxn".to_string() => 0,
384 "EndTxn".to_string() => 0,
385 "TxnOffsetCommit".to_string() => 0,
386 "SaslAuthenticate".to_string() => 0,
387 }
388 );
389
390 assert_eq!(stats.topics.len(), 1);
391 }
392
393 // Example from https://github.com/edenhill/librdkafka/wiki/Statistics
394 const EXAMPLE: &str = r#"
395 {
396 "name": "rdkafka#producer-1",
397 "client_id": "rdkafka",
398 "type": "producer",
399 "ts": 1163982743268,
400 "time": 1589652530,
401 "age": 5,
402 "replyq": 0,
403 "msg_cnt": 320,
404 "msg_size": 9920,
405 "msg_max": 500000,
406 "msg_size_max": 1073741824,
407 "simple_cnt": 0,
408 "metadata_cache_cnt": 1,
409 "brokers": {
410 "localhost:9092/0": {
411 "name": "localhost:9092/0",
412 "nodeid": 0,
413 "nodename": "localhost:9092",
414 "source": "configured",
415 "state": "UP",
416 "stateage": 8005652,
417 "outbuf_cnt": 0,
418 "outbuf_msg_cnt": 0,
419 "waitresp_cnt": 1,
420 "waitresp_msg_cnt": 126,
421 "tx": 31311,
422 "txbytes": 463869957,
423 "txerrs": 0,
424 "txretries": 0,
425 "txidle": 5,
426 "req_timeouts": 0,
427 "rx": 31310,
428 "rxbytes": 1753668,
429 "rxerrs": 0,
430 "rxcorriderrs": 0,
431 "rxpartial": 0,
432 "rxidle": 5,
433 "zbuf_grow": 0,
434 "buf_grow": 0,
435 "wakeups": 131568,
436 "connects": 1,
437 "disconnects": 0,
438 "int_latency": {
439 "min": 2,
440 "max": 9193,
441 "avg": 605,
442 "sum": 874202325,
443 "stddev": 1080,
444 "p50": 319,
445 "p75": 481,
446 "p90": 1135,
447 "p95": 3023,
448 "p99": 5919,
449 "p99_99": 9087,
450 "outofrange": 0,
451 "hdrsize": 15472,
452 "cnt": 1443154
453 },
454 "outbuf_latency": {
455 "min": 1,
456 "max": 308,
457 "avg": 22,
458 "sum": 107311,
459 "stddev": 21,
460 "p50": 22,
461 "p75": 29,
462 "p90": 36,
463 "p95": 44,
464 "p99": 111,
465 "p99_99": 309,
466 "outofrange": 0,
467 "hdrsize": 11376,
468 "cnt": 4740
469 },
470 "rtt": {
471 "min": 94,
472 "max": 3279,
473 "avg": 237,
474 "sum": 1124867,
475 "stddev": 198,
476 "p50": 193,
477 "p75": 245,
478 "p90": 329,
479 "p95": 393,
480 "p99": 1183,
481 "p99_99": 3279,
482 "outofrange": 0,
483 "hdrsize": 13424,
484 "cnt": 4739
485 },
486 "throttle": {
487 "min": 0,
488 "max": 0,
489 "avg": 0,
490 "sum": 0,
491 "stddev": 0,
492 "p50": 0,
493 "p75": 0,
494 "p90": 0,
495 "p95": 0,
496 "p99": 0,
497 "p99_99": 0,
498 "outofrange": 0,
499 "hdrsize": 17520,
500 "cnt": 4739
501 },
502 "req": {
503 "Produce": 31307,
504 "Offset": 0,
505 "Metadata": 2,
506 "FindCoordinator": 0,
507 "SaslHandshake": 0,
508 "ApiVersion": 2,
509 "InitProducerId": 0,
510 "AddPartitionsToTxn": 0,
511 "AddOffsetsToTxn": 0,
512 "EndTxn": 0,
513 "TxnOffsetCommit": 0,
514 "SaslAuthenticate": 0
515 },
516 "toppars": {
517 "test-0": {
518 "topic": "test",
519 "partition": 0
520 },
521 "test-1": {
522 "topic": "test",
523 "partition": 1
524 },
525 "test-2": {
526 "topic": "test",
527 "partition": 2
528 }
529 }
530 }
531 },
532 "topics": {
533 "test": {
534 "topic": "test",
535 "metadata_age": 7014,
536 "batchsize": {
537 "min": 99,
538 "max": 240276,
539 "avg": 11871,
540 "sum": 56260370,
541 "stddev": 13137,
542 "p50": 10431,
543 "p75": 11583,
544 "p90": 12799,
545 "p95": 13823,
546 "p99": 72191,
547 "p99_99": 240639,
548 "outofrange": 0,
549 "hdrsize": 14448,
550 "cnt": 4739
551 },
552 "batchcnt": {
553 "min": 1,
554 "max": 6161,
555 "avg": 304,
556 "sum": 1442353,
557 "stddev": 336,
558 "p50": 267,
559 "p75": 297,
560 "p90": 329,
561 "p95": 353,
562 "p99": 1847,
563 "p99_99": 6175,
564 "outofrange": 0,
565 "hdrsize": 8304,
566 "cnt": 4739
567 },
568 "partitions": {
569 "0": {
570 "partition": 0,
571 "broker": 0,
572 "leader": 0,
573 "desired": false,
574 "unknown": false,
575 "msgq_cnt": 845,
576 "msgq_bytes": 26195,
577 "xmit_msgq_cnt": 0,
578 "xmit_msgq_bytes": 0,
579 "fetchq_cnt": 0,
580 "fetchq_size": 0,
581 "fetch_state": "none",
582 "query_offset": -1001,
583 "next_offset": 0,
584 "app_offset": -1001,
585 "stored_offset": -1001,
586 "commited_offset": -1001,
587 "committed_offset": -1001,
588 "eof_offset": -1001,
589 "lo_offset": -1001,
590 "hi_offset": -1001,
591 "ls_offset": -1001,
592 "consumer_lag": -1,
593 "consumer_lag_stored": 0,
594 "txmsgs": 3950967,
595 "txbytes": 122479977,
596 "rxmsgs": 0,
597 "rxbytes": 0,
598 "msgs": 3951812,
599 "rx_ver_drops": 0,
600 "msgs_inflight": 1067,
601 "next_ack_seq": 0,
602 "next_err_seq": 0,
603 "acked_msgid": 0
604 },
605 "1": {
606 "partition": 1,
607 "broker": 0,
608 "leader": 0,
609 "desired": false,
610 "unknown": false,
611 "msgq_cnt": 229,
612 "msgq_bytes": 7099,
613 "xmit_msgq_cnt": 0,
614 "xmit_msgq_bytes": 0,
615 "fetchq_cnt": 0,
616 "fetchq_size": 0,
617 "fetch_state": "none",
618 "query_offset": -1001,
619 "next_offset": 0,
620 "app_offset": -1001,
621 "stored_offset": -1001,
622 "commited_offset": -1001,
623 "committed_offset": -1001,
624 "eof_offset": -1001,
625 "lo_offset": -1001,
626 "hi_offset": -1001,
627 "ls_offset": -1001,
628 "consumer_lag": -1,
629 "consumer_lag_stored": 0,
630 "txmsgs": 3950656,
631 "txbytes": 122470336,
632 "rxmsgs": 0,
633 "rxbytes": 0,
634 "msgs": 3952618,
635 "rx_ver_drops": 0,
636 "msgs_inflight": 0,
637 "next_ack_seq": 0,
638 "next_err_seq": 0,
639 "acked_msgid": 0
640 },
641 "2": {
642 "partition": 2,
643 "broker": 0,
644 "leader": 0,
645 "desired": false,
646 "unknown": false,
647 "msgq_cnt": 1816,
648 "msgq_bytes": 56296,
649 "xmit_msgq_cnt": 0,
650 "xmit_msgq_bytes": 0,
651 "fetchq_cnt": 0,
652 "fetchq_size": 0,
653 "fetch_state": "none",
654 "query_offset": -1001,
655 "next_offset": 0,
656 "app_offset": -1001,
657 "stored_offset": -1001,
658 "commited_offset": -1001,
659 "committed_offset": -1001,
660 "eof_offset": -1001,
661 "lo_offset": -1001,
662 "hi_offset": -1001,
663 "ls_offset": -1001,
664 "consumer_lag": -1,
665 "consumer_lag_stored": 0,
666 "txmsgs": 3952027,
667 "txbytes": 122512837,
668 "rxmsgs": 0,
669 "rxbytes": 0,
670 "msgs": 3953855,
671 "rx_ver_drops": 0,
672 "msgs_inflight": 0,
673 "next_ack_seq": 0,
674 "next_err_seq": 0,
675 "acked_msgid": 0
676 },
677 "-1": {
678 "partition": -1,
679 "broker": -1,
680 "leader": -1,
681 "desired": false,
682 "unknown": false,
683 "msgq_cnt": 0,
684 "msgq_bytes": 0,
685 "xmit_msgq_cnt": 0,
686 "xmit_msgq_bytes": 0,
687 "fetchq_cnt": 0,
688 "fetchq_size": 0,
689 "fetch_state": "none",
690 "query_offset": -1001,
691 "next_offset": 0,
692 "app_offset": -1001,
693 "stored_offset": -1001,
694 "commited_offset": -1001,
695 "committed_offset": -1001,
696 "eof_offset": -1001,
697 "lo_offset": -1001,
698 "hi_offset": -1001,
699 "ls_offset": -1001,
700 "consumer_lag": -1,
701 "consumer_lag_stored": 0,
702 "txmsgs": 0,
703 "txbytes": 0,
704 "rxmsgs": 0,
705 "rxbytes": 0,
706 "msgs": 500000,
707 "rx_ver_drops": 0,
708 "msgs_inflight": 0,
709 "next_ack_seq": 0,
710 "next_err_seq": 0,
711 "acked_msgid": 0
712 }
713 }
714 }
715 },
716 "tx": 31311,
717 "tx_bytes": 463869957,
718 "rx": 31310,
719 "rx_bytes": 1753668,
720 "txmsgs": 11853650,
721 "txmsg_bytes": 367463150,
722 "rxmsgs": 0,
723 "rxmsg_bytes": 0
724 }"#;
725}