
KIP-714: Keep your Kafka Clusters Close, and your Kafka Clients Closer
For many years, the only way to capture Kafka client metrics was using JMX. Starting with Kafka 3.7.0 and, thanks to the KIP-714, it is now possible to pull client metrics from your Kafka clusters using OpenTelemetry. This blog post explains how KIP-714 works, what you have to do to collect metrics, and gives you a working code to test things out.
kafka-client-metrics.sh
that can be found in the /bin
folder of your Kafka distribution.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
λ riferrei ~ → kafka-client-metrics.sh
This tool helps to manipulate and describe client metrics configurations.
Option Description
------ -----------
--alter Alter the configuration for the client
metrics resource.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client.
--delete Delete the configuration for the
client metrics resource.
--describe List configurations for the client
metrics resource.
--generate-name Generate a UUID to use as the name.
--help Print usage information.
--interval <Integer: push interval> The metrics push interval in
milliseconds.
--list List the client metrics resources.
--match <String: k1=v1,k2=v2> Matching selector 'k1=v1,k2=v2'. The
following is a list of valid
selector names:
client_id
client_instance_id
client_software_name
client_software_version
client_source_address
client_source_port
--metrics <String: m1,m2> Telemetry metric name prefixes 'm1,m2'.
--name <String: name> Name of client metrics configuration
resource.
--version Display Kafka version.
1
2
3
$KAFKA_HOME/bin/kafka-client-metrics.sh --bootstrap-server $BOOTSTRAP_SERVER \\
--metrics org.apache.kafka.producer.,org.apache.kafka.consumer. \\
--alter --generate-name --interval 1000
org.apache.kafka.common.metrics.MetricsReporter
interface. This is how this interface looks like.1
2
3
4
5
6
7
8
9
10
11
12
13
public interface MetricsReporter extends Reconfigurable, AutoCloseable {
void init(List<KafkaMetric> metrics);
void configure(Map<String, ?> configs);
void metricChange(KafkaMetric metric);
void metricRemoval(KafkaMetric metric);
void close();
}
init()
, configure()
, and close()
for this purpose.MetricsReporter
interface.MetricsReporter
interface is the first step. This is your way to extend Kafka's plugin framework to implement a reporter. However, to allow your Kafka brokers to receive metrics from the clients, your code also needs the org.apache.kafka.server.telemetry.ClientTelemetry
interface.1
2
3
4
5
public interface ClientTelemetry {
ClientTelemetryReceiver clientReceiver();
}
ClientTelemetryReceiver
. This is the component that will process the metrics serialized in the OpenTelemetry metrics format and send to a compatible endpoint.1
2
3
4
5
public interface ClientTelemetryReceiver {
void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload);
}
ClientTelemetryReceiver
to be as lightweight as possible. Chances are, this component will be cached by the Kafka broker and reused across several invocations. Moreover, sending metrics to the endpoint must use non-blocking APIs to ensure that Kafka won't end up with busy threads.ClientTelemetryReceiver
interface.$KAFKA_HOME/libs
folder. You must also inform Kafka which metrics reporter to use. You can do this by setting the property metrics.reporter
from the Kafka broker. Here is how you can do this via environment variables.1
export KAFKA_METRIC_REPORTERS=com.riferrei.kafka.KIP714MetricReporter
1
git clone https://github.com/riferrei/kafka-client-metrics-to-cloudwatch-with-kip-714.git
1
cd kafka-client-metrics-to-cloudwatch-with-kip-714
~/.aws
directory for the default
profile. Follow the instructions detailed here to create your own set of credentials.kafka
that is your Kafka broker. The other one is called collector
which is the OpenTelemetry collector functioning as a side-car to push the collected client metrics to the observability backend. Start these containers with the following command.1
docker compose up -d
load-test
already created. This is going to be the topic you will use to verify if the client metrics are being collected. Moreover, the Kafka broker will be already configured to allow for the collection of both producer and consumer metrics. All you need to do to make this happen is write and read records to/from Kafka.load-test
. You don't need to write any client producer application for this. Kafka includes the kafka-producer-perf-test.sh
utility tool exactly for this purpose. To load 50K records, with each record containing 1KB and sending 1000 records every second, use the following command.1
kafka-producer-perf-test.sh --producer-props bootstrap.servers=localhost:9092 --throughput 1000 --num-records 50000 --record-size 1024 --topic load-test --print-metrics
1
2
3
4
5
6
7
8
9
10
4999 records sent, 999.4 records/sec (0.98 MB/sec), 8.2 ms avg latency, 292.0 ms max latency.
5002 records sent, 1000.4 records/sec (0.98 MB/sec), 1.2 ms avg latency, 21.0 ms max latency.
5002 records sent, 999.2 records/sec (0.98 MB/sec), 1.1 ms avg latency, 11.0 ms max latency.
5007 records sent, 1000.4 records/sec (0.98 MB/sec), 1.2 ms avg latency, 19.0 ms max latency.
5007 records sent, 1000.8 records/sec (0.98 MB/sec), 1.4 ms avg latency, 14.0 ms max latency.
5002 records sent, 1000.2 records/sec (0.98 MB/sec), 1.5 ms avg latency, 21.0 ms max latency.
4993 records sent, 998.2 records/sec (0.97 MB/sec), 1.6 ms avg latency, 22.0 ms max latency.
5013 records sent, 1002.2 records/sec (0.98 MB/sec), 1.6 ms avg latency, 13.0 ms max latency.
5002 records sent, 1000.2 records/sec (0.98 MB/sec), 1.5 ms avg latency, 7.0 ms max latency.
50000 records sent, 999.880014 records/sec (0.98 MB/sec), 2.10 ms avg latency, 292.00 ms max latency, 1 ms 50th, 4 ms 95th, 10 ms 99th, 146 ms 99.9th.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
Metric Name Value
app-info:commit-id:{client-id=perf-producer-client} : 2ae524ed625438c5
app-info:start-time-ms:{client-id=perf-producer-client} : 1719256696165
app-info:version:{client-id=perf-producer-client} : 3.7.0
kafka-metrics-count:count:{client-id=perf-producer-client} : 112.000
producer-metrics:batch-size-avg:{client-id=perf-producer-client} : 2478.732
producer-metrics:batch-size-max:{client-id=perf-producer-client} : 15556.000
producer-metrics:batch-split-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:batch-split-total:{client-id=perf-producer-client} : 0.000
producer-metrics:buffer-available-bytes:{client-id=perf-producer-client} : 33554432.000
producer-metrics:buffer-exhausted-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:buffer-exhausted-total:{client-id=perf-producer-client} : 0.000
producer-metrics:buffer-total-bytes:{client-id=perf-producer-client} : 33554432.000
producer-metrics:bufferpool-wait-ratio:{client-id=perf-producer-client} : 0.000
producer-metrics:bufferpool-wait-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:bufferpool-wait-time-total:{client-id=perf-producer-client} : 0.000
producer-metrics:compression-rate-avg:{client-id=perf-producer-client} : 1.000
producer-metrics:connection-close-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:connection-close-total:{client-id=perf-producer-client} : 0.000
producer-metrics:connection-count:{client-id=perf-producer-client} : 2.000
producer-metrics:connection-creation-rate:{client-id=perf-producer-client} : 0.040
producer-metrics:connection-creation-total:{client-id=perf-producer-client} : 2.000
producer-metrics:failed-authentication-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:failed-authentication-total:{client-id=perf-producer-client} : 0.000
producer-metrics:failed-reauthentication-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:failed-reauthentication-total:{client-id=perf-producer-client} : 0.000
producer-metrics:flush-time-ns-total:{client-id=perf-producer-client} : 1757375.000
producer-metrics:incoming-byte-rate:{client-id=perf-producer-client} : 25718.753
producer-metrics:incoming-byte-total:{client-id=perf-producer-client} : 1283803.000
producer-metrics:io-ratio:{client-id=perf-producer-client} : 0.052
producer-metrics:io-time-ns-avg:{client-id=perf-producer-client} : 42781.231
producer-metrics:io-time-ns-total:{client-id=perf-producer-client} : 2614831638.000
producer-metrics:io-wait-ratio:{client-id=perf-producer-client} : 0.897
producer-metrics:io-wait-time-ns-avg:{client-id=perf-producer-client} : 732721.341
producer-metrics:io-wait-time-ns-total:{client-id=perf-producer-client} : 44784661091.000
producer-metrics:io-waittime-total:{client-id=perf-producer-client} : 44784661091.000
producer-metrics:iotime-total:{client-id=perf-producer-client} : 2614831638.000
producer-metrics:metadata-age:{client-id=perf-producer-client} : 49.888
producer-metrics:metadata-wait-time-ns-total:{client-id=perf-producer-client} : 123723791.000
producer-metrics:network-io-rate:{client-id=perf-producer-client} : 858.070
producer-metrics:network-io-total:{client-id=perf-producer-client} : 42834.000
producer-metrics:outgoing-byte-rate:{client-id=perf-producer-client} : 1097380.516
producer-metrics:outgoing-byte-total:{client-id=perf-producer-client} : 54780138.000
producer-metrics:produce-throttle-time-avg:{client-id=perf-producer-client} : 0.000
producer-metrics:produce-throttle-time-max:{client-id=perf-producer-client} : 0.000
producer-metrics:reauthentication-latency-avg:{client-id=perf-producer-client} : NaN
producer-metrics:reauthentication-latency-max:{client-id=perf-producer-client} : NaN
producer-metrics:record-error-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:record-error-total:{client-id=perf-producer-client} : 0.000
producer-metrics:record-queue-time-avg:{client-id=perf-producer-client} : 0.074
producer-metrics:record-queue-time-max:{client-id=perf-producer-client} : 23.000
producer-metrics:record-retry-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:record-retry-total:{client-id=perf-producer-client} : 0.000
producer-metrics:record-send-rate:{client-id=perf-producer-client} : 1002.587
producer-metrics:record-send-total:{client-id=perf-producer-client} : 50000.000
producer-metrics:record-size-avg:{client-id=perf-producer-client} : 1110.000
producer-metrics:record-size-max:{client-id=perf-producer-client} : 1110.000
producer-metrics:records-per-request-avg:{client-id=perf-producer-client} : 2.340
producer-metrics:request-latency-avg:{client-id=perf-producer-client} : 1.458
producer-metrics:request-latency-max:{client-id=perf-producer-client} : 25.000
producer-metrics:request-rate:{client-id=perf-producer-client} : 429.035
producer-metrics:request-size-avg:{client-id=perf-producer-client} : 2557.788
producer-metrics:request-size-max:{client-id=perf-producer-client} : 15619.000
producer-metrics:request-total:{client-id=perf-producer-client} : 21417.000
producer-metrics:requests-in-flight:{client-id=perf-producer-client} : 0.000
producer-metrics:response-rate:{client-id=perf-producer-client} : 429.052
producer-metrics:response-total:{client-id=perf-producer-client} : 21417.000
producer-metrics:select-rate:{client-id=perf-producer-client} : 1224.109
producer-metrics:select-total:{client-id=perf-producer-client} : 61121.000
producer-metrics:successful-authentication-no-reauth-total:{client-id=perf-producer-client} : 0.000
producer-metrics:successful-authentication-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:successful-authentication-total:{client-id=perf-producer-client} : 0.000
producer-metrics:successful-reauthentication-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:successful-reauthentication-total:{client-id=perf-producer-client} : 0.000
producer-metrics:txn-abort-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:txn-begin-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:txn-commit-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:txn-init-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:txn-send-offsets-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:waiting-threads:{client-id=perf-producer-client} : 0.000
producer-node-metrics:incoming-byte-rate:{client-id=perf-producer-client, node-id=node--1} : 30.731
producer-node-metrics:incoming-byte-rate:{client-id=perf-producer-client, node-id=node-1} : 25709.654
producer-node-metrics:incoming-byte-total:{client-id=perf-producer-client, node-id=node--1} : 1534.000
producer-node-metrics:incoming-byte-total:{client-id=perf-producer-client, node-id=node-1} : 1282269.000
producer-node-metrics:outgoing-byte-rate:{client-id=perf-producer-client, node-id=node--1} : 9636.932
producer-node-metrics:outgoing-byte-rate:{client-id=perf-producer-client, node-id=node-1} : 1088659.542
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node--1} : 481066.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1} : 54299072.000
producer-node-metrics:request-latency-avg:{client-id=perf-producer-client, node-id=node--1} : NaN
producer-node-metrics:request-latency-avg:{client-id=perf-producer-client, node-id=node-1} : 1.458
producer-node-metrics:request-latency-max:{client-id=perf-producer-client, node-id=node--1} : NaN
producer-node-metrics:request-latency-max:{client-id=perf-producer-client, node-id=node-1} : 25.000
producer-node-metrics:request-rate:{client-id=perf-producer-client, node-id=node--1} : 1.062
producer-node-metrics:request-rate:{client-id=perf-producer-client, node-id=node-1} : 428.334
producer-node-metrics:request-size-avg:{client-id=perf-producer-client, node-id=node--1} : 9076.717
producer-node-metrics:request-size-avg:{client-id=perf-producer-client, node-id=node-1} : 2541.615
producer-node-metrics:request-size-max:{client-id=perf-producer-client, node-id=node--1} : 9813.000
producer-node-metrics:request-size-max:{client-id=perf-producer-client, node-id=node-1} : 15619.000
producer-node-metrics:request-total:{client-id=perf-producer-client, node-id=node--1} : 53.000
producer-node-metrics:request-total:{client-id=perf-producer-client, node-id=node-1} : 21364.000
producer-node-metrics:response-rate:{client-id=perf-producer-client, node-id=node--1} : 1.062
producer-node-metrics:response-rate:{client-id=perf-producer-client, node-id=node-1} : 428.351
producer-node-metrics:response-total:{client-id=perf-producer-client, node-id=node--1} : 53.000
producer-node-metrics:response-total:{client-id=perf-producer-client, node-id=node-1} : 21364.000
producer-topic-metrics:byte-rate:{client-id=perf-producer-client, topic=load-test} : 1061823.601
producer-topic-metrics:byte-total:{client-id=perf-producer-client, topic=load-test} : 52953143.000
producer-topic-metrics:compression-rate:{client-id=perf-producer-client, topic=load-test} : 1.000
producer-topic-metrics:record-error-rate:{client-id=perf-producer-client, topic=load-test} : 0.000
producer-topic-metrics:record-error-total:{client-id=perf-producer-client, topic=load-test} : 0.000
producer-topic-metrics:record-retry-rate:{client-id=perf-producer-client, topic=load-test} : 0.000
producer-topic-metrics:record-retry-total:{client-id=perf-producer-client, topic=load-test} : 0.000
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=load-test} : 1002.607
producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=load-test} : 50000.000
kafka-consumer-perf-test.sh
utility tool for this purpose. To read the 50K records, use the following command.1
kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --messages 50000 --topic load-test --print-metrics
1
2
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2024-06-24 15:24:32:294, 2024-06-24 15:24:33:084, 48.8281, 61.8078, 50000, 63291.1392, 484, 306, 159.5690, 163398.6928
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
Metric Name Value
consumer-coordinator-metrics:assigned-partitions:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:commit-latency-avg:{client-id=perf-consumer-client} : 10.000
consumer-coordinator-metrics:commit-latency-max:{client-id=perf-consumer-client} : 10.000
consumer-coordinator-metrics:commit-rate:{client-id=perf-consumer-client} : 0.033
consumer-coordinator-metrics:commit-total:{client-id=perf-consumer-client} : 1.000
consumer-coordinator-metrics:failed-rebalance-rate-per-hour:{client-id=perf-consumer-client} : 347.759
consumer-coordinator-metrics:failed-rebalance-total:{client-id=perf-consumer-client} : 3.000
consumer-coordinator-metrics:heartbeat-rate:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:heartbeat-response-time-max:{client-id=perf-consumer-client} : NaN
consumer-coordinator-metrics:heartbeat-total:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:join-rate:{client-id=perf-consumer-client} : 0.032
consumer-coordinator-metrics:join-time-avg:{client-id=perf-consumer-client} : 9.000
consumer-coordinator-metrics:join-time-max:{client-id=perf-consumer-client} : 9.000
consumer-coordinator-metrics:join-total:{client-id=perf-consumer-client} : 1.000
consumer-coordinator-metrics:last-heartbeat-seconds-ago:{client-id=perf-consumer-client} : -1.000
consumer-coordinator-metrics:last-rebalance-seconds-ago:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:partition-assigned-latency-avg:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:partition-assigned-latency-max:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:partition-lost-latency-avg:{client-id=perf-consumer-client} : NaN
consumer-coordinator-metrics:partition-lost-latency-max:{client-id=perf-consumer-client} : NaN
consumer-coordinator-metrics:partition-revoked-latency-avg:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:partition-revoked-latency-max:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:rebalance-latency-avg:{client-id=perf-consumer-client} : 241.000
consumer-coordinator-metrics:rebalance-latency-max:{client-id=perf-consumer-client} : 241.000
consumer-coordinator-metrics:rebalance-latency-total:{client-id=perf-consumer-client} : 241.000
consumer-coordinator-metrics:rebalance-rate-per-hour:{client-id=perf-consumer-client} : 116.788
consumer-coordinator-metrics:rebalance-total:{client-id=perf-consumer-client} : 1.000
consumer-coordinator-metrics:sync-rate:{client-id=perf-consumer-client} : 0.032
consumer-coordinator-metrics:sync-time-avg:{client-id=perf-consumer-client} : 11.000
consumer-coordinator-metrics:sync-time-max:{client-id=perf-consumer-client} : 11.000
consumer-coordinator-metrics:sync-total:{client-id=perf-consumer-client} : 1.000
consumer-fetch-manager-metrics:bytes-consumed-rate:{client-id=perf-consumer-client, topic=load-test} : 1678364.853
consumer-fetch-manager-metrics:bytes-consumed-rate:{client-id=perf-consumer-client} : 1678364.853
consumer-fetch-manager-metrics:bytes-consumed-total:{client-id=perf-consumer-client, topic=load-test} : 51650000.000
consumer-fetch-manager-metrics:bytes-consumed-total:{client-id=perf-consumer-client} : 51650000.000
consumer-fetch-manager-metrics:fetch-latency-avg:{client-id=perf-consumer-client} : 14.385
consumer-fetch-manager-metrics:fetch-latency-max:{client-id=perf-consumer-client} : 508.000
consumer-fetch-manager-metrics:fetch-rate:{client-id=perf-consumer-client} : 1.689
consumer-fetch-manager-metrics:fetch-size-avg:{client-id=perf-consumer-client, topic=load-test} : 1012745.098
consumer-fetch-manager-metrics:fetch-size-avg:{client-id=perf-consumer-client} : 1012745.098
consumer-fetch-manager-metrics:fetch-size-max:{client-id=perf-consumer-client, topic=load-test} : 1029901.000
consumer-fetch-manager-metrics:fetch-size-max:{client-id=perf-consumer-client} : 1029901.000
consumer-fetch-manager-metrics:fetch-throttle-time-avg:{client-id=perf-consumer-client} : 0.000
consumer-fetch-manager-metrics:fetch-throttle-time-max:{client-id=perf-consumer-client} : 0.000
consumer-fetch-manager-metrics:fetch-total:{client-id=perf-consumer-client} : 52.000
consumer-fetch-manager-metrics:preferred-read-replica:{client-id=perf-consumer-client, topic=load-test, partition=0} : -1
consumer-fetch-manager-metrics:records-consumed-rate:{client-id=perf-consumer-client, topic=load-test} : 1624.748
consumer-fetch-manager-metrics:records-consumed-rate:{client-id=perf-consumer-client} : 1624.695
consumer-fetch-manager-metrics:records-consumed-total:{client-id=perf-consumer-client, topic=load-test} : 50000.000
consumer-fetch-manager-metrics:records-consumed-total:{client-id=perf-consumer-client} : 50000.000
consumer-fetch-manager-metrics:records-lag-avg:{client-id=perf-consumer-client, topic=load-test, partition=0} : 24515.608
consumer-fetch-manager-metrics:records-lag-max:{client-id=perf-consumer-client, topic=load-test, partition=0} : 49500.000
consumer-fetch-manager-metrics:records-lag-max:{client-id=perf-consumer-client} : 49500.000
consumer-fetch-manager-metrics:records-lag:{client-id=perf-consumer-client, topic=load-test, partition=0} : 0.000
consumer-fetch-manager-metrics:records-lead-avg:{client-id=perf-consumer-client, topic=load-test, partition=0} : 25484.392
consumer-fetch-manager-metrics:records-lead-min:{client-id=perf-consumer-client, topic=load-test, partition=0} : 500.000
consumer-fetch-manager-metrics:records-lead-min:{client-id=perf-consumer-client} : 500.000
consumer-fetch-manager-metrics:records-lead:{client-id=perf-consumer-client, topic=load-test, partition=0} : 50000.000
consumer-fetch-manager-metrics:records-per-request-avg:{client-id=perf-consumer-client, topic=load-test} : 980.392
consumer-fetch-manager-metrics:records-per-request-avg:{client-id=perf-consumer-client} : 980.392
kafka-metrics-count:count:{client-id=perf-consumer-client} : 61.000
kafka-kip-714
. Note that the OpenTelemetry collector from this demo is configured to send all the metrics to the us-east-1
region.1
aws cloudwatch list-metrics --region us-east-1 --namespace kafka-kip-714 --output json | jq '.Metrics | .[] | "\(.MetricName)"' | sort
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"org.apache.kafka.consumer.commit.sync.time.ns.total"
"org.apache.kafka.consumer.committed.time.ns.total"
"org.apache.kafka.consumer.connection.close.rate"
"org.apache.kafka.consumer.connection.close.total"
"org.apache.kafka.consumer.connection.count"
"org.apache.kafka.consumer.connection.creation.rate"
"org.apache.kafka.consumer.connection.creation.total"
"org.apache.kafka.consumer.coordinator.assigned.partitions"
"org.apache.kafka.consumer.coordinator.commit.latency.avg"
"org.apache.kafka.consumer.coordinator.commit.latency.max"
"org.apache.kafka.consumer.coordinator.commit.rate"
"org.apache.kafka.consumer.coordinator.commit.total"
"org.apache.kafka.consumer.coordinator.failed.rebalance.rate.per.hour"
"org.apache.kafka.consumer.coordinator.failed.rebalance.total"
"org.apache.kafka.consumer.coordinator.heartbeat.rate"
"org.apache.kafka.consumer.coordinator.heartbeat.response.time.max"
"org.apache.kafka.consumer.coordinator.heartbeat.total"
"org.apache.kafka.consumer.coordinator.join.rate"
"org.apache.kafka.consumer.coordinator.join.time.avg"
"org.apache.kafka.consumer.coordinator.join.time.max"
"org.apache.kafka.consumer.coordinator.join.total"
"org.apache.kafka.consumer.coordinator.last.heartbeat.seconds.ago"
"org.apache.kafka.consumer.coordinator.last.rebalance.seconds.ago"
"org.apache.kafka.consumer.coordinator.partition.assigned.latency.avg"
"org.apache.kafka.consumer.coordinator.partition.assigned.latency.max"
"org.apache.kafka.consumer.coordinator.rebalance.latency.avg"
"org.apache.kafka.consumer.coordinator.rebalance.latency.max"
"org.apache.kafka.consumer.coordinator.rebalance.latency.total"
"org.apache.kafka.consumer.coordinator.rebalance.rate.per.hour"
"org.apache.kafka.consumer.coordinator.rebalance.total"
"org.apache.kafka.consumer.coordinator.sync.rate"
"org.apache.kafka.consumer.coordinator.sync.time.avg"
"org.apache.kafka.consumer.coordinator.sync.time.max"
"org.apache.kafka.consumer.coordinator.sync.total"
"org.apache.kafka.consumer.failed.authentication.rate"
"org.apache.kafka.consumer.failed.authentication.total"
"org.apache.kafka.consumer.failed.reauthentication.rate"
"org.apache.kafka.consumer.failed.reauthentication.total"
"org.apache.kafka.consumer.fetch.manager.bytes.consumed.rate"
"org.apache.kafka.consumer.fetch.manager.bytes.consumed.rate"
"org.apache.kafka.consumer.fetch.manager.bytes.consumed.total"
"org.apache.kafka.consumer.fetch.manager.bytes.consumed.total"
"org.apache.kafka.consumer.fetch.manager.fetch.latency.avg"
"org.apache.kafka.consumer.fetch.manager.fetch.latency.max"
"org.apache.kafka.consumer.fetch.manager.fetch.rate"
"org.apache.kafka.consumer.fetch.manager.fetch.size.avg"
"org.apache.kafka.consumer.fetch.manager.fetch.size.avg"
"org.apache.kafka.consumer.fetch.manager.fetch.size.max"
"org.apache.kafka.consumer.fetch.manager.fetch.size.max"
"org.apache.kafka.consumer.fetch.manager.fetch.throttle.time.avg"
"org.apache.kafka.consumer.fetch.manager.fetch.throttle.time.max"
"org.apache.kafka.consumer.fetch.manager.fetch.total"
"org.apache.kafka.consumer.fetch.manager.preferred.read.replica"
"org.apache.kafka.consumer.fetch.manager.preferred.read.replica"
"org.apache.kafka.consumer.fetch.manager.preferred.read.replica"
"org.apache.kafka.consumer.fetch.manager.preferred.read.replica"
"org.apache.kafka.consumer.fetch.manager.records.consumed.rate"
"org.apache.kafka.consumer.fetch.manager.records.consumed.rate"
"org.apache.kafka.consumer.fetch.manager.records.consumed.total"
"org.apache.kafka.consumer.fetch.manager.records.consumed.total"
"org.apache.kafka.consumer.fetch.manager.records.lag"
"org.apache.kafka.consumer.fetch.manager.records.lag"
"org.apache.kafka.consumer.fetch.manager.records.lag"
"org.apache.kafka.consumer.fetch.manager.records.lag"
"org.apache.kafka.consumer.fetch.manager.records.lag.avg"
"org.apache.kafka.consumer.fetch.manager.records.lag.avg"
"org.apache.kafka.consumer.fetch.manager.records.lag.avg"
"org.apache.kafka.consumer.fetch.manager.records.lag.avg"
"org.apache.kafka.consumer.fetch.manager.records.lag.max"
"org.apache.kafka.consumer.fetch.manager.records.lag.max"
"org.apache.kafka.consumer.fetch.manager.records.lag.max"
"org.apache.kafka.consumer.fetch.manager.records.lag.max"
"org.apache.kafka.consumer.fetch.manager.records.lead"
"org.apache.kafka.consumer.fetch.manager.records.lead"
"org.apache.kafka.consumer.fetch.manager.records.lead"
"org.apache.kafka.consumer.fetch.manager.records.lead"
"org.apache.kafka.consumer.fetch.manager.records.lead.avg"
"org.apache.kafka.consumer.fetch.manager.records.lead.avg"
"org.apache.kafka.consumer.fetch.manager.records.lead.avg"
"org.apache.kafka.consumer.fetch.manager.records.lead.avg"
"org.apache.kafka.consumer.fetch.manager.records.lead.min"
"org.apache.kafka.consumer.fetch.manager.records.lead.min"
"org.apache.kafka.consumer.fetch.manager.records.lead.min"
"org.apache.kafka.consumer.fetch.manager.records.lead.min"
"org.apache.kafka.consumer.fetch.manager.records.per.request.avg"
"org.apache.kafka.consumer.fetch.manager.records.per.request.avg"
"org.apache.kafka.consumer.incoming.byte.rate"
"org.apache.kafka.consumer.incoming.byte.total"
"org.apache.kafka.consumer.io.ratio"
"org.apache.kafka.consumer.io.time.ns.avg"
"org.apache.kafka.consumer.io.time.ns.total"
"org.apache.kafka.consumer.io.wait.ratio"
"org.apache.kafka.consumer.io.wait.time.ns.avg"
"org.apache.kafka.consumer.io.wait.time.ns.total"
"org.apache.kafka.consumer.io.waittime.total"
"org.apache.kafka.consumer.iotime.total"
"org.apache.kafka.consumer.last.poll.seconds.ago"
"org.apache.kafka.consumer.network.io.rate"
"org.apache.kafka.consumer.network.io.total"
"org.apache.kafka.consumer.node.incoming.byte.rate"
"org.apache.kafka.consumer.node.incoming.byte.rate"
"org.apache.kafka.consumer.node.incoming.byte.rate"
"org.apache.kafka.consumer.node.incoming.byte.rate"
"org.apache.kafka.consumer.node.incoming.byte.total"
"org.apache.kafka.consumer.node.incoming.byte.total"
"org.apache.kafka.consumer.node.incoming.byte.total"
"org.apache.kafka.consumer.node.incoming.byte.total"
"org.apache.kafka.consumer.node.outgoing.byte.rate"
"org.apache.kafka.consumer.node.outgoing.byte.rate"
"org.apache.kafka.consumer.node.outgoing.byte.rate"
"org.apache.kafka.consumer.node.outgoing.byte.rate"
"org.apache.kafka.consumer.node.outgoing.byte.total"
"org.apache.kafka.consumer.node.outgoing.byte.total"
"org.apache.kafka.consumer.node.outgoing.byte.total"
"org.apache.kafka.consumer.node.outgoing.byte.total"
"org.apache.kafka.consumer.node.request.rate"
"org.apache.kafka.consumer.node.request.rate"
"org.apache.kafka.consumer.node.request.rate"
"org.apache.kafka.consumer.node.request.rate"
"org.apache.kafka.consumer.node.request.size.avg"
"org.apache.kafka.consumer.node.request.size.avg"
"org.apache.kafka.consumer.node.request.size.avg"
"org.apache.kafka.consumer.node.request.size.avg"
"org.apache.kafka.consumer.node.request.size.max"
"org.apache.kafka.consumer.node.request.size.max"
"org.apache.kafka.consumer.node.request.size.max"
"org.apache.kafka.consumer.node.request.size.max"
"org.apache.kafka.consumer.node.request.total"
"org.apache.kafka.consumer.node.request.total"
"org.apache.kafka.consumer.node.request.total"
"org.apache.kafka.consumer.node.request.total"
"org.apache.kafka.consumer.node.response.rate"
"org.apache.kafka.consumer.node.response.rate"
"org.apache.kafka.consumer.node.response.rate"
"org.apache.kafka.consumer.node.response.rate"
"org.apache.kafka.consumer.node.response.total"
"org.apache.kafka.consumer.node.response.total"
"org.apache.kafka.consumer.node.response.total"
"org.apache.kafka.consumer.node.response.total"
"org.apache.kafka.consumer.outgoing.byte.rate"
"org.apache.kafka.consumer.outgoing.byte.total"
"org.apache.kafka.consumer.poll.idle.ratio.avg"
"org.apache.kafka.consumer.request.rate"
"org.apache.kafka.consumer.request.size.avg"
"org.apache.kafka.consumer.request.size.max"
"org.apache.kafka.consumer.request.total"
"org.apache.kafka.consumer.response.rate"
"org.apache.kafka.consumer.response.total"
"org.apache.kafka.consumer.select.rate"
"org.apache.kafka.consumer.select.total"
"org.apache.kafka.consumer.successful.authentication.no.reauth.total"
"org.apache.kafka.consumer.successful.authentication.rate"
"org.apache.kafka.consumer.successful.authentication.total"
"org.apache.kafka.consumer.successful.reauthentication.rate"
"org.apache.kafka.consumer.successful.reauthentication.total"
"org.apache.kafka.consumer.time.between.poll.avg"
"org.apache.kafka.consumer.time.between.poll.max"
"org.apache.kafka.producer.batch.size.avg"
"org.apache.kafka.producer.batch.size.max"
"org.apache.kafka.producer.batch.split.rate"
"org.apache.kafka.producer.batch.split.total"
"org.apache.kafka.producer.buffer.available.bytes"
"org.apache.kafka.producer.buffer.exhausted.rate"
"org.apache.kafka.producer.buffer.exhausted.total"
"org.apache.kafka.producer.buffer.total.bytes"
"org.apache.kafka.producer.bufferpool.wait.ratio"
"org.apache.kafka.producer.bufferpool.wait.time.ns.total"
"org.apache.kafka.producer.bufferpool.wait.time.total"
"org.apache.kafka.producer.compression.rate.avg"
"org.apache.kafka.producer.connection.close.rate"
"org.apache.kafka.producer.connection.close.total"
"org.apache.kafka.producer.connection.count"
"org.apache.kafka.producer.connection.creation.rate"
"org.apache.kafka.producer.connection.creation.total"
"org.apache.kafka.producer.failed.authentication.rate"
"org.apache.kafka.producer.failed.authentication.total"
"org.apache.kafka.producer.failed.reauthentication.rate"
"org.apache.kafka.producer.failed.reauthentication.total"
"org.apache.kafka.producer.flush.time.ns.total"
"org.apache.kafka.producer.incoming.byte.rate"
"org.apache.kafka.producer.incoming.byte.total"
"org.apache.kafka.producer.io.ratio"
"org.apache.kafka.producer.io.time.ns.avg"
"org.apache.kafka.producer.io.time.ns.total"
"org.apache.kafka.producer.io.wait.ratio"
"org.apache.kafka.producer.io.wait.time.ns.avg"
"org.apache.kafka.producer.io.wait.time.ns.total"
"org.apache.kafka.producer.io.waittime.total"
"org.apache.kafka.producer.iotime.total"
"org.apache.kafka.producer.metadata.age"
"org.apache.kafka.producer.metadata.wait.time.ns.total"
"org.apache.kafka.producer.network.io.rate"
"org.apache.kafka.producer.network.io.total"
"org.apache.kafka.producer.node.incoming.byte.rate"
"org.apache.kafka.producer.node.incoming.byte.rate"
"org.apache.kafka.producer.node.incoming.byte.rate"
"org.apache.kafka.producer.node.incoming.byte.total"
"org.apache.kafka.producer.node.incoming.byte.total"
"org.apache.kafka.producer.node.incoming.byte.total"
"org.apache.kafka.producer.node.outgoing.byte.rate"
"org.apache.kafka.producer.node.outgoing.byte.rate"
"org.apache.kafka.producer.node.outgoing.byte.rate"
"org.apache.kafka.producer.node.outgoing.byte.total"
"org.apache.kafka.producer.node.outgoing.byte.total"
"org.apache.kafka.producer.node.outgoing.byte.total"
"org.apache.kafka.producer.node.request.latency.avg"
"org.apache.kafka.producer.node.request.latency.avg"
"org.apache.kafka.producer.node.request.latency.max"
"org.apache.kafka.producer.node.request.latency.max"
"org.apache.kafka.producer.node.request.rate"
"org.apache.kafka.producer.node.request.rate"
"org.apache.kafka.producer.node.request.rate"
"org.apache.kafka.producer.node.request.size.avg"
"org.apache.kafka.producer.node.request.size.avg"
"org.apache.kafka.producer.node.request.size.avg"
"org.apache.kafka.producer.node.request.size.max"
"org.apache.kafka.producer.node.request.size.max"
"org.apache.kafka.producer.node.request.size.max"
"org.apache.kafka.producer.node.request.total"
"org.apache.kafka.producer.node.request.total"
"org.apache.kafka.producer.node.request.total"
"org.apache.kafka.producer.node.response.rate"
"org.apache.kafka.producer.node.response.rate"
"org.apache.kafka.producer.node.response.rate"
"org.apache.kafka.producer.node.response.total"
"org.apache.kafka.producer.node.response.total"
"org.apache.kafka.producer.node.response.total"
"org.apache.kafka.producer.outgoing.byte.rate"
"org.apache.kafka.producer.outgoing.byte.total"
"org.apache.kafka.producer.produce.throttle.time.avg"
"org.apache.kafka.producer.produce.throttle.time.max"
"org.apache.kafka.producer.record.error.rate"
"org.apache.kafka.producer.record.error.total"
"org.apache.kafka.producer.record.queue.time.avg"
"org.apache.kafka.producer.record.queue.time.max"
"org.apache.kafka.producer.record.retry.rate"
"org.apache.kafka.producer.record.retry.total"
"org.apache.kafka.producer.record.send.rate"
"org.apache.kafka.producer.record.send.total"
"org.apache.kafka.producer.record.size.avg"
"org.apache.kafka.producer.record.size.max"
"org.apache.kafka.producer.records.per.request.avg"
"org.apache.kafka.producer.request.latency.avg"
"org.apache.kafka.producer.request.latency.max"
"org.apache.kafka.producer.request.rate"
"org.apache.kafka.producer.request.size.avg"
"org.apache.kafka.producer.request.size.max"
"org.apache.kafka.producer.request.total"
"org.apache.kafka.producer.requests.in.flight"
"org.apache.kafka.producer.response.rate"
"org.apache.kafka.producer.response.total"
"org.apache.kafka.producer.select.rate"
"org.apache.kafka.producer.select.total"
"org.apache.kafka.producer.successful.authentication.no.reauth.total"
"org.apache.kafka.producer.successful.authentication.rate"
"org.apache.kafka.producer.successful.authentication.total"
"org.apache.kafka.producer.successful.reauthentication.rate"
"org.apache.kafka.producer.successful.reauthentication.total"
"org.apache.kafka.producer.topic.byte.rate"
"org.apache.kafka.producer.topic.byte.rate"
"org.apache.kafka.producer.topic.byte.total"
"org.apache.kafka.producer.topic.byte.total"
"org.apache.kafka.producer.topic.compression.rate"
"org.apache.kafka.producer.topic.compression.rate"
"org.apache.kafka.producer.topic.record.error.rate"
"org.apache.kafka.producer.topic.record.error.rate"
"org.apache.kafka.producer.topic.record.error.total"
"org.apache.kafka.producer.topic.record.error.total"
"org.apache.kafka.producer.topic.record.retry.rate"
"org.apache.kafka.producer.topic.record.retry.rate"
"org.apache.kafka.producer.topic.record.retry.total"
"org.apache.kafka.producer.topic.record.retry.total"
"org.apache.kafka.producer.topic.record.send.rate"
"org.apache.kafka.producer.topic.record.send.rate"
"org.apache.kafka.producer.topic.record.send.total"
"org.apache.kafka.producer.topic.record.send.total"
"org.apache.kafka.producer.txn.abort.time.ns.total"
"org.apache.kafka.producer.txn.begin.time.ns.total"
"org.apache.kafka.producer.txn.commit.time.ns.total"
"org.apache.kafka.producer.txn.init.time.ns.total"
"org.apache.kafka.producer.txn.send.offsets.time.ns.total"
"org.apache.kafka.producer.waiting.threads"
kafka-kip-714
one.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"metrics": [
[ "kafka-kip-714", "org.apache.kafka.producer.topic.record.send.total", "topic", "load-test" ],
[ "kafka-kip-714", "org.apache.kafka.consumer.fetch.manager.records.consumed.total", "topic", "load-test" ]
],
"view": "gauge",
"stacked": false,
"region": "us-east-1",
"yAxis": {
"left": {
"min": 0,
"max": 50000
}
},
"stat": "Sum",
"period": 300,
"liveData": true,
"setPeriodToTimeRange": false,
"sparkline": true,
"trend": true
}
org.apache.kafka.producer.topic.record.send.total
represents the number of records written, and the second metric called org.apache.kafka.consumer.fetch.manager.records.consumed.total
is the number of records read. Since all the metrics are available in a per-topic basis, you can create nice dashboards showing the details of each topic.