madsim_rdkafka/std/
statistics.rs

1//! Client and broker statistics.
2//!
3//! These statistics are collected automatically by librdkafka when the client
4//! is configured with a non-zero `statistics.interval.ms`. They are made
5//! available via the [`ClientContext::stats`] callback.
6//!
7//! Refer to the [librdkafka statistics documentation][librdkafka-stats] for
8//! details.
9//!
10//! [`ClientContext::stats`]: crate::ClientContext::stats
11//! [librdkafka-stats]: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
12
13use std::collections::HashMap;
14
15use serde::Deserialize;
16
17/// Overall statistics.
18#[derive(Deserialize, Debug, Default, Clone)]
19pub struct Statistics {
20    /// The name of the librdkafka handle.
21    pub name: String,
22    /// The configured `client.id`.
23    pub client_id: String,
24    /// The instance type (producer or consumer).
25    #[serde(rename = "type")]
26    pub client_type: String,
27    /// The current value of librdkafka's internal monotonic clock, in
28    // microseconds since start.
29    pub ts: i64,
30    /// Wall clock time, in seconds since the Unix epoch.
31    pub time: i64,
32    /// Time since this client instance was created, in microseconds.
33    pub age: i64,
34    /// The number of operations (callbacks, events, etc.) waiting in queue.
35    pub replyq: i64,
36    /// The current number of messages in producer queues.
37    pub msg_cnt: u64,
38    /// The current total size of messages in producer queues.
39    pub msg_size: u64,
40    /// The maximum number of messages allowed in the producer queues.
41    pub msg_max: u64,
42    /// The maximum total size of messages allowed in the producer queues.
43    pub msg_size_max: u64,
44    /// The total number of requests sent to brokers.
45    pub tx: i64,
46    /// The total number of bytes transmitted to brokers.
47    pub tx_bytes: i64,
48    /// The total number of responses received from brokers.
49    pub rx: i64,
50    /// The total number of bytes received from brokers.
51    pub rx_bytes: i64,
52    /// The total number of messages transmitted (produced) to brokers.
53    pub txmsgs: i64,
54    /// The total number of bytes transmitted (produced) to brokers.
55    pub txmsg_bytes: i64,
56    /// The total number of messages consumed from brokers, not including
57    /// ignored messages.
58    pub rxmsgs: i64,
59    /// The total number of bytes (including framing) consumed from brokers.
60    pub rxmsg_bytes: i64,
61    /// Internal tracking of legacy vs. new consumer API state.
62    pub simple_cnt: i64,
63    /// Number of topics in the metadata cache.
64    pub metadata_cache_cnt: i64,
65    /// Per-broker statistics.
66    pub brokers: HashMap<String, Broker>,
67    /// Per-topic statistics.
68    pub topics: HashMap<String, Topic>,
69    /// Consumer group statistics.
70    pub cgrp: Option<ConsumerGroup>,
71    /// Exactly-once semantics and idempotent producer statistics.
72    pub eos: Option<ExactlyOnceSemantics>,
73}
74
75/// Per-broker statistics.
76#[derive(Deserialize, Debug, Default, Clone)]
77pub struct Broker {
78    /// The broker hostname, port, and ID, in the form `HOSTNAME:PORT/ID`.
79    pub name: String,
80    /// The broker ID (-1 for bootstraps).
81    pub nodeid: i32,
82    /// The broker hostname and port.
83    pub nodename: String,
84    /// The broker source (learned, configured, internal, or logical).
85    pub source: String,
86    /// The broker state (INIT, DOWN, CONNECT, AUTH, APIVERSION_QUERY,
87    /// AUTH_HANDSHAKE, UP, UPDATE).
88    pub state: String,
89    /// The time since the last broker state change, in microseconds.
90    pub stateage: i64,
91    /// The number of requests awaiting transmission to the broker.
92    pub outbuf_cnt: i64,
93    /// The number of messages awaiting transmission to the broker.
94    pub outbuf_msg_cnt: i64,
95    /// The number of requests in-flight to the broker that are awaiting a
96    /// response.
97    pub waitresp_cnt: i64,
98    /// The number of messages in-flight to the broker that are awaiting a
99    /// response.
100    pub waitresp_msg_cnt: i64,
101    /// The total number of requests sent to the broker.
102    pub tx: u64,
103    /// The total number of bytes sent to the broker.
104    pub txbytes: u64,
105    /// The total number of transmission errors.
106    pub txerrs: u64,
107    /// The total number of request retries.
108    pub txretries: u64,
109    /// Microseconds since last socket send, or -1 if no sends yet for the
110    /// current connection.
111    pub txidle: i64,
112    /// The total number of requests that timed out.
113    pub req_timeouts: u64,
114    /// The total number of responses received from the broker.
115    pub rx: u64,
116    /// The total number of bytes received from the broker.
117    pub rxbytes: u64,
118    /// The total number of receive errors.
119    pub rxerrs: u64,
120    /// The number of unmatched correlation IDs in response, typically for
121    /// timed out requests.
122    pub rxcorriderrs: u64,
123    /// The total number of partial message sets received. The broker may return
124    /// partial responses if the full message set could not fit in the remaining
125    /// fetch response size.
126    pub rxpartial: u64,
127    /// Microseconds since last socket receive, or -1 if no receives yet for the
128    /// current connection.
129    pub rxidle: i64,
130    /// Request type counters. The object key is the name of the request type
131    /// and the value is the number of requests of that type that have been
132    /// sent.
133    pub req: HashMap<String, i64>,
134    /// The total number of decompression buffer size increases.
135    pub zbuf_grow: u64,
136    /// The total number of buffer size increases (deprecated and unused).
137    pub buf_grow: u64,
138    /// The number of broker thread poll wakeups.
139    pub wakeups: Option<u64>,
140    /// The number of connection attempts, including successful and failed
141    /// attempts, and name resolution failures.
142    pub connects: Option<i64>,
143    /// The number of disconnections, whether triggered by the broker, the
144    /// network, the load balancer, or something else.
145    pub disconnects: Option<i64>,
146    /// Rolling window statistics for the internal producer queue latency, in
147    /// microseconds.
148    pub int_latency: Option<Window>,
149    /// Rolling window statistics for the internal request queue latency, in
150    /// microseconds.
151    ///
152    /// This is the time between when a request is enqueued on the transmit
153    /// (outbuf) queue and the time the request is written to the TCP socket.
154    /// Additional buffering and latency may be incurred by the TCP stack and
155    /// network.
156    pub outbuf_latency: Option<Window>,
157    /// Rolling window statistics for the broker latency/round-trip time,
158    /// in microseconds.
159    pub rtt: Option<Window>,
160    /// Rolling window statistics for the broker throttling time, in
161    /// milliseconds.
162    pub throttle: Option<Window>,
163    /// The partitions that are handled by this broker handle.
164    pub toppars: HashMap<String, TopicPartition>,
165}
166
167/// Rolling window statistics.
168///
169/// These values are not exact; they are sampled estimates maintained by an
170/// HDR histogram in librdkafka.
171#[derive(Deserialize, Debug, Default, Clone)]
172pub struct Window {
173    /// The smallest value.
174    pub min: i64,
175    /// The largest value.
176    pub max: i64,
177    /// The mean value.
178    pub avg: i64,
179    /// The sum of all values.
180    pub sum: i64,
181    /// The total number of values.
182    pub cnt: i64,
183    /// The standard deviation.
184    pub stddev: i64,
185    /// The memory size of the underlying HDR histogram.
186    pub hdrsize: i64,
187    /// The 50th percentile.
188    pub p50: i64,
189    /// The 75th percentile.
190    pub p75: i64,
191    /// The 90th percentile.
192    pub p90: i64,
193    /// The 95th percentile.
194    pub p95: i64,
195    /// The 99th percentile.
196    pub p99: i64,
197    /// The 99.99th percentile.
198    pub p99_99: i64,
199    /// The number of values not included in the underlying histogram because
200    /// they were out of range.
201    pub outofrange: i64,
202}
203
204/// A topic and partition specifier.
205#[derive(Deserialize, Debug, Default, Clone)]
206pub struct TopicPartition {
207    /// The name of the topic.
208    pub topic: String,
209    /// The ID of the partition.
210    pub partition: i32,
211}
212
213/// Per-topic statistics.
214#[derive(Deserialize, Debug, Default, Clone)]
215pub struct Topic {
216    /// The name of the topic.
217    pub topic: String,
218    /// The age of the client's metadata for this topic, in milliseconds.
219    pub metadata_age: i64,
220    /// Rolling window statistics for batch sizes, in bytes.
221    pub batchsize: Window,
222    /// Rolling window statistics for batch message counts.
223    pub batchcnt: Window,
224    /// Per-partition statistics.
225    pub partitions: HashMap<i32, Partition>,
226}
227
228/// Per-partition statistics.
229#[derive(Deserialize, Debug, Default, Clone)]
230pub struct Partition {
231    /// The partition ID.
232    pub partition: i32,
233    /// The ID of the broker from which messages are currently being fetched.
234    pub broker: i32,
235    /// The broker ID of the leader.
236    pub leader: i32,
237    /// Whether the partition is explicitly desired by the application.
238    pub desired: bool,
239    /// Whether the partition is not seen in the topic metadata from the broker.
240    pub unknown: bool,
241    /// The number of messages waiting to be produced in the first-level queue.
242    pub msgq_cnt: i64,
243    /// The number of bytes waiting to be produced in the first-level queue.
244    pub msgq_bytes: u64,
245    /// The number of messages ready to be produced in the transmit queue.
246    pub xmit_msgq_cnt: i64,
247    /// The number of bytes ready to be produced in the transmit queue.
248    pub xmit_msgq_bytes: u64,
249    /// The number of prefetched messages in the fetch queue.
250    pub fetchq_cnt: i64,
251    /// The number of bytes in the fetch queue.
252    pub fetchq_size: u64,
253    /// The consumer fetch state for this partition (none, stopping, stopped,
254    /// offset-query, offset-wait, active).
255    pub fetch_state: String,
256    /// The current/last logical offset query.
257    pub query_offset: i64,
258    /// The next offset to fetch.
259    pub next_offset: i64,
260    /// The offset of the last message passed to the application, plus one.
261    pub app_offset: i64,
262    /// The offset to be committed.
263    pub stored_offset: i64,
264    /// The last committed offset.
265    pub committed_offset: i64,
266    /// The last offset for which partition EOF was signaled.
267    pub eof_offset: i64,
268    /// The low watermark offset on the broker.
269    pub lo_offset: i64,
270    /// The high watermark offset on the broker.
271    pub hi_offset: i64,
272    /// The last stable offset on the broker.
273    pub ls_offset: i64,
274    /// The difference between `hi_offset` and `committed_offset`.
275    pub consumer_lag: i64,
276    /// The difference between `hi_offset` and `stored_offset`.
277    pub consumer_lag_stored: i64,
278    /// The total number of messages transmitted (produced).
279    pub txmsgs: u64,
280    /// The total number of bytes transmitted (produced).
281    pub txbytes: u64,
282    /// The total number of messages consumed, not included ignored messages.
283    pub rxmsgs: u64,
284    /// The total bytes consumed.
285    pub rxbytes: u64,
286    /// The total number of messages received, for consumers, or the total
287    /// number of messages produced, for producers.
288    pub msgs: u64,
289    /// The number of dropped outdated messages.
290    pub rx_ver_drops: u64,
291    /// The current number of messages in flight to or from the broker.
292    pub msgs_inflight: i64,
293    /// The next expected acked sequence number, for idempotent producers.
294    pub next_ack_seq: i64,
295    /// The next expected errored sequence number, for idempotent producers.
296    pub next_err_seq: i64,
297    /// The last acked internal message ID, for idempotent producers.
298    pub acked_msgid: u64,
299}
300
301/// Consumer group manager statistics.
302#[derive(Deserialize, Debug, Default, Clone)]
303pub struct ConsumerGroup {
304    /// The local consumer group handler's state.
305    pub state: String,
306    /// The time elapsed since the last state change, in milliseconds.
307    pub stateage: i64,
308    /// The local consumer group handler's join state.
309    pub join_state: String,
310    /// The time elapsed since the last rebalance (assign or revoke), in
311    /// milliseconds.
312    pub rebalance_age: i64,
313    /// The total number of rebalances (assign or revoke).
314    pub rebalance_cnt: i64,
315    /// The reason for the last rebalance.
316    ///
317    /// This string will be empty if no rebalances have occurred.
318    pub rebalance_reason: String,
319    /// The partition count for the current assignment.
320    pub assignment_size: i32,
321}
322
323/// Exactly-once semantics statistics.
324#[derive(Deserialize, Debug, Default, Clone)]
325pub struct ExactlyOnceSemantics {
326    /// The current idempotent producer state.
327    pub idemp_state: String,
328    /// THe time elapsed since the last idempotent producer state change, in
329    /// milliseconds.
330    pub idemp_stateage: i64,
331    /// The current transactional producer state.
332    pub txn_state: String,
333    /// The time elapsed since the last transactional producer state change, in
334    /// milliseconds.
335    pub txn_stateage: i64,
336    /// Whether the transactional state allows enqueuing (producing) new
337    /// messages.
338    pub txn_may_enq: bool,
339    /// The currently assigned producer ID, or -1.
340    pub producer_id: i64,
341    /// The current epoch, or -1.
342    pub producer_epoch: i64,
343    /// The number of producer ID assignments.
344    pub epoch_cnt: i64,
345}
346
347#[cfg(test)]
348mod tests {
349    use maplit::hashmap;
350
351    use super::*;
352
353    #[test]
354    fn test_statistics() {
355        let stats: Statistics = serde_json::from_str(EXAMPLE).unwrap();
356
357        assert_eq!(stats.name, "rdkafka#producer-1");
358        assert_eq!(stats.client_type, "producer");
359        assert_eq!(stats.ts, 1163982743268);
360        assert_eq!(stats.time, 1589652530);
361        assert_eq!(stats.replyq, 0);
362        assert_eq!(stats.msg_cnt, 320);
363        assert_eq!(stats.msg_size, 9920);
364        assert_eq!(stats.msg_max, 500000);
365        assert_eq!(stats.msg_size_max, 1073741824);
366        assert_eq!(stats.simple_cnt, 0);
367
368        assert_eq!(stats.brokers.len(), 1);
369
370        let broker = stats.brokers.values().collect::<Vec<_>>()[0];
371
372        assert_eq!(
373            broker.req,
374            hashmap! {
375                "Produce".to_string() => 31307,
376                "Offset".to_string() => 0,
377                "Metadata".to_string() => 2,
378                "FindCoordinator".to_string() => 0,
379                "SaslHandshake".to_string() => 0,
380                "ApiVersion".to_string() => 2,
381                "InitProducerId".to_string() => 0,
382                "AddPartitionsToTxn".to_string() => 0,
383                "AddOffsetsToTxn".to_string() => 0,
384                "EndTxn".to_string() => 0,
385                "TxnOffsetCommit".to_string() => 0,
386                "SaslAuthenticate".to_string() => 0,
387            }
388        );
389
390        assert_eq!(stats.topics.len(), 1);
391    }
392
393    // Example from https://github.com/edenhill/librdkafka/wiki/Statistics
394    const EXAMPLE: &str = r#"
395      {
396        "name": "rdkafka#producer-1",
397        "client_id": "rdkafka",
398        "type": "producer",
399        "ts": 1163982743268,
400        "time": 1589652530,
401        "age": 5,
402        "replyq": 0,
403        "msg_cnt": 320,
404        "msg_size": 9920,
405        "msg_max": 500000,
406        "msg_size_max": 1073741824,
407        "simple_cnt": 0,
408        "metadata_cache_cnt": 1,
409        "brokers": {
410          "localhost:9092/0": {
411            "name": "localhost:9092/0",
412            "nodeid": 0,
413            "nodename": "localhost:9092",
414            "source": "configured",
415            "state": "UP",
416            "stateage": 8005652,
417            "outbuf_cnt": 0,
418            "outbuf_msg_cnt": 0,
419            "waitresp_cnt": 1,
420            "waitresp_msg_cnt": 126,
421            "tx": 31311,
422            "txbytes": 463869957,
423            "txerrs": 0,
424            "txretries": 0,
425            "txidle": 5,
426            "req_timeouts": 0,
427            "rx": 31310,
428            "rxbytes": 1753668,
429            "rxerrs": 0,
430            "rxcorriderrs": 0,
431            "rxpartial": 0,
432            "rxidle": 5,
433            "zbuf_grow": 0,
434            "buf_grow": 0,
435            "wakeups": 131568,
436            "connects": 1,
437            "disconnects": 0,
438            "int_latency": {
439              "min": 2,
440              "max": 9193,
441              "avg": 605,
442              "sum": 874202325,
443              "stddev": 1080,
444              "p50": 319,
445              "p75": 481,
446              "p90": 1135,
447              "p95": 3023,
448              "p99": 5919,
449              "p99_99": 9087,
450              "outofrange": 0,
451              "hdrsize": 15472,
452              "cnt": 1443154
453            },
454            "outbuf_latency": {
455              "min": 1,
456              "max": 308,
457              "avg": 22,
458              "sum": 107311,
459              "stddev": 21,
460              "p50": 22,
461              "p75": 29,
462              "p90": 36,
463              "p95": 44,
464              "p99": 111,
465              "p99_99": 309,
466              "outofrange": 0,
467              "hdrsize": 11376,
468              "cnt": 4740
469            },
470            "rtt": {
471              "min": 94,
472              "max": 3279,
473              "avg": 237,
474              "sum": 1124867,
475              "stddev": 198,
476              "p50": 193,
477              "p75": 245,
478              "p90": 329,
479              "p95": 393,
480              "p99": 1183,
481              "p99_99": 3279,
482              "outofrange": 0,
483              "hdrsize": 13424,
484              "cnt": 4739
485            },
486            "throttle": {
487              "min": 0,
488              "max": 0,
489              "avg": 0,
490              "sum": 0,
491              "stddev": 0,
492              "p50": 0,
493              "p75": 0,
494              "p90": 0,
495              "p95": 0,
496              "p99": 0,
497              "p99_99": 0,
498              "outofrange": 0,
499              "hdrsize": 17520,
500              "cnt": 4739
501            },
502            "req": {
503              "Produce": 31307,
504              "Offset": 0,
505              "Metadata": 2,
506              "FindCoordinator": 0,
507              "SaslHandshake": 0,
508              "ApiVersion": 2,
509              "InitProducerId": 0,
510              "AddPartitionsToTxn": 0,
511              "AddOffsetsToTxn": 0,
512              "EndTxn": 0,
513              "TxnOffsetCommit": 0,
514              "SaslAuthenticate": 0
515            },
516            "toppars": {
517              "test-0": {
518                "topic": "test",
519                "partition": 0
520              },
521              "test-1": {
522                "topic": "test",
523                "partition": 1
524              },
525              "test-2": {
526                "topic": "test",
527                "partition": 2
528              }
529            }
530          }
531        },
532        "topics": {
533          "test": {
534            "topic": "test",
535            "metadata_age": 7014,
536            "batchsize": {
537              "min": 99,
538              "max": 240276,
539              "avg": 11871,
540              "sum": 56260370,
541              "stddev": 13137,
542              "p50": 10431,
543              "p75": 11583,
544              "p90": 12799,
545              "p95": 13823,
546              "p99": 72191,
547              "p99_99": 240639,
548              "outofrange": 0,
549              "hdrsize": 14448,
550              "cnt": 4739
551            },
552            "batchcnt": {
553              "min": 1,
554              "max": 6161,
555              "avg": 304,
556              "sum": 1442353,
557              "stddev": 336,
558              "p50": 267,
559              "p75": 297,
560              "p90": 329,
561              "p95": 353,
562              "p99": 1847,
563              "p99_99": 6175,
564              "outofrange": 0,
565              "hdrsize": 8304,
566              "cnt": 4739
567            },
568            "partitions": {
569              "0": {
570                "partition": 0,
571                "broker": 0,
572                "leader": 0,
573                "desired": false,
574                "unknown": false,
575                "msgq_cnt": 845,
576                "msgq_bytes": 26195,
577                "xmit_msgq_cnt": 0,
578                "xmit_msgq_bytes": 0,
579                "fetchq_cnt": 0,
580                "fetchq_size": 0,
581                "fetch_state": "none",
582                "query_offset": -1001,
583                "next_offset": 0,
584                "app_offset": -1001,
585                "stored_offset": -1001,
586                "commited_offset": -1001,
587                "committed_offset": -1001,
588                "eof_offset": -1001,
589                "lo_offset": -1001,
590                "hi_offset": -1001,
591                "ls_offset": -1001,
592                "consumer_lag": -1,
593                "consumer_lag_stored": 0,
594                "txmsgs": 3950967,
595                "txbytes": 122479977,
596                "rxmsgs": 0,
597                "rxbytes": 0,
598                "msgs": 3951812,
599                "rx_ver_drops": 0,
600                "msgs_inflight": 1067,
601                "next_ack_seq": 0,
602                "next_err_seq": 0,
603                "acked_msgid": 0
604              },
605              "1": {
606                "partition": 1,
607                "broker": 0,
608                "leader": 0,
609                "desired": false,
610                "unknown": false,
611                "msgq_cnt": 229,
612                "msgq_bytes": 7099,
613                "xmit_msgq_cnt": 0,
614                "xmit_msgq_bytes": 0,
615                "fetchq_cnt": 0,
616                "fetchq_size": 0,
617                "fetch_state": "none",
618                "query_offset": -1001,
619                "next_offset": 0,
620                "app_offset": -1001,
621                "stored_offset": -1001,
622                "commited_offset": -1001,
623                "committed_offset": -1001,
624                "eof_offset": -1001,
625                "lo_offset": -1001,
626                "hi_offset": -1001,
627                "ls_offset": -1001,
628                "consumer_lag": -1,
629                "consumer_lag_stored": 0,
630                "txmsgs": 3950656,
631                "txbytes": 122470336,
632                "rxmsgs": 0,
633                "rxbytes": 0,
634                "msgs": 3952618,
635                "rx_ver_drops": 0,
636                "msgs_inflight": 0,
637                "next_ack_seq": 0,
638                "next_err_seq": 0,
639                "acked_msgid": 0
640              },
641              "2": {
642                "partition": 2,
643                "broker": 0,
644                "leader": 0,
645                "desired": false,
646                "unknown": false,
647                "msgq_cnt": 1816,
648                "msgq_bytes": 56296,
649                "xmit_msgq_cnt": 0,
650                "xmit_msgq_bytes": 0,
651                "fetchq_cnt": 0,
652                "fetchq_size": 0,
653                "fetch_state": "none",
654                "query_offset": -1001,
655                "next_offset": 0,
656                "app_offset": -1001,
657                "stored_offset": -1001,
658                "commited_offset": -1001,
659                "committed_offset": -1001,
660                "eof_offset": -1001,
661                "lo_offset": -1001,
662                "hi_offset": -1001,
663                "ls_offset": -1001,
664                "consumer_lag": -1,
665                "consumer_lag_stored": 0,
666                "txmsgs": 3952027,
667                "txbytes": 122512837,
668                "rxmsgs": 0,
669                "rxbytes": 0,
670                "msgs": 3953855,
671                "rx_ver_drops": 0,
672                "msgs_inflight": 0,
673                "next_ack_seq": 0,
674                "next_err_seq": 0,
675                "acked_msgid": 0
676              },
677              "-1": {
678                "partition": -1,
679                "broker": -1,
680                "leader": -1,
681                "desired": false,
682                "unknown": false,
683                "msgq_cnt": 0,
684                "msgq_bytes": 0,
685                "xmit_msgq_cnt": 0,
686                "xmit_msgq_bytes": 0,
687                "fetchq_cnt": 0,
688                "fetchq_size": 0,
689                "fetch_state": "none",
690                "query_offset": -1001,
691                "next_offset": 0,
692                "app_offset": -1001,
693                "stored_offset": -1001,
694                "commited_offset": -1001,
695                "committed_offset": -1001,
696                "eof_offset": -1001,
697                "lo_offset": -1001,
698                "hi_offset": -1001,
699                "ls_offset": -1001,
700                "consumer_lag": -1,
701                "consumer_lag_stored": 0,
702                "txmsgs": 0,
703                "txbytes": 0,
704                "rxmsgs": 0,
705                "rxbytes": 0,
706                "msgs": 500000,
707                "rx_ver_drops": 0,
708                "msgs_inflight": 0,
709                "next_ack_seq": 0,
710                "next_err_seq": 0,
711                "acked_msgid": 0
712              }
713            }
714          }
715        },
716        "tx": 31311,
717        "tx_bytes": 463869957,
718        "rx": 31310,
719        "rx_bytes": 1753668,
720        "txmsgs": 11853650,
721        "txmsg_bytes": 367463150,
722        "rxmsgs": 0,
723        "rxmsg_bytes": 0
724      }"#;
725}