Select your cookie preferences

We use essential cookies and similar tools that are necessary to provide our site and services. We use performance cookies to collect anonymous statistics, so we can understand how customers use our site and make improvements. Essential cookies cannot be deactivated, but you can choose “Customize” or “Decline” to decline performance cookies.

If you agree, AWS and approved third parties will also use cookies to provide useful site features, remember your preferences, and display relevant content, including relevant advertising. To accept or decline all non-essential cookies, choose “Accept” or “Decline.” To make more detailed choices, choose “Customize.”

AWS Logo
Menu
KIP-714: Keep your Kafka Clusters Close, and your Kafka Clients Closer

KIP-714: Keep your Kafka Clusters Close, and your Kafka Clients Closer

For many years, the only way to capture Kafka client metrics was using JMX. Starting with Kafka 3.7.0 and, thanks to the KIP-714, it is now possible to pull client metrics from your Kafka clusters using OpenTelemetry. This blog post explains how KIP-714 works, what you have to do to collect metrics, and gives you a working code to test things out.

Published Jun 25, 2024
Last Modified Jun 26, 2024

Overview

It is undeniable how successful Apache Kafka is as a technology. Despite being no longer alone in the streaming data space, Kafka remains one of the best options for scenarios requiring a distributed stream storage. But here is something that may get few people surprised: Kafka's success doesn't come exclusively from its brokers and cluster technology. Kafka's client APIs have an enormous weight in this too.
Image not found
To properly understand this, it can help if you think about how other messaging technologies work. Their client APIs are essentially what the name implies. A way for client applications to communicate with the system and exchange messages. Whether you are talking about JMS, AMQP, MQTT; all of them follow the same design. It leads you to think that the key actor with the lead role in the architecture is the messaging system itself. However, with Kafka, things are different. Kafka's clustering protocol is heavily centered on the idea of the clients allowing messages to be produced and consumed, as well as in helping the system implement details such as fault-tolerance, availability, redundancy, and horizontal scaling.
Even without exchanging messages with the cluster, the clients are constantly taking care of Kafka's clustering details. Take a topic rebalancing as an example. Whenever the number of partitions of a topic changes, the consumer instances within a group must be reassigned, and that happens at the consumer side. Similarly, whoever is going to be the partition leader from the producer perspective happens at the producer side. For this reason, properly designing an observability strategy to your Kafka clusters must also include client metrics. With cluster metrics only, it is hard to identify the culprits of tricky performance problems. Worst, without looking at client metrics, you can't really pinpoint the root-cause of problems because very often they happen at the client level. Your Kafka clients are as important as your Kafka clusters, and your observability strategy must treat them as symbionts.
With the KIP-714, it is now possible to pull client metrics from your Kafka clusters using OpenTelemetry. It simplifies considerably the way you can implement observability. This blog explains how KIP-714 works, what you have to do to collect metrics, and gives you a working code to test things out.

Client monitoring problems

Since the early days of Kafka, client metrics was already been available. Kafka clients are written in Java, and therefore, they provide the ability to expose JMX metrics via the JVM. As long as your monitoring system can collect JMX metrics, you could retrieve the metrics and build your own monitoring system. However, this approach proved to be troublesome over the years.
For starters, in order to connect with a JMX-enabled system, you must have an agent monitoring system built on Java. In the early 00s that wouldn't be much of a problem, since we thought that Java would be the future of distributed computing. In that reality, any given Kafka client written in Java could easily process the metrics available in the JVM. Still in the same reality, if we wanted to outsource the metric collection to another system, chances are this system would be also implemented in Java and collecting the metrics was one JMX API call away.
With the advent of microservices, other programming languages became as relevant as Java. Cloud was also a mechanism of change. Modern programming languages designed to be as effective as Java for the cloud became mainstream. Considering this new reality, collecting metrics from JMX-enabled systems was no longer easy. In most cases, monitoring systems would need a Java agent that would collect the metrics from JMX; make this metrics available in some transient system, and allow this metrics to be retrieved via REST APIs. This creates a monitoring strategy that is fragile and hard to implement.
Image not found
To complicate things ever further, client metrics are collected on a per-client basis. This means that each client application requires its own agent installed to expose metrics to the observability backend. What is the problem with this? You may ask. Well, the problem is scalability. Kafka wasn't designed to have just a few clients connected, producing and consuming messages. As a central nervous system that is designed to integrate virtually all systems from a given organization, you may expect to have at least a couple thousand clients per cluster. Installing and maintaining agents for each client is by itself a recipe for organizational chaos.
Image not found
The need for multiple agents also means higher observability costs. Some observability vendors price their software per host monitored. As such, if you need to install an agent on every host that executes a Kafka client application, it will bring your observability costs to a prohibitive level. Suddenly, you start having to select only a few applications to monitor to decrease your costs, but in turn, you will also minimize your ability to identify the culprit of tricky performance problems.
Because of these client monitoring problems, KIP-714 was created.

How does KIP-714 work?

The idea behind KIP-714 is to allow client metrics to be pulled from a central location, and exposed to the outside world via the metrics plugin. You can use your own plugin to send the metrics to your observability backend, which will be available over the OpenTelemetry metrics serialization format. This design allows you to build flexible integration strategies; not tied to specific observability vendors, and extensible enough for you to evolve your strategy at any time. At a high-level, this is how KIP-714 looks like.
Image not found
Client metrics collection is disabled by default. You need to explicitly enable the support for client metrics collection in your Kafka cluster, and specify which metrics you want to collect from the clients. Moreover, you need to specify the push interval, which controls the frequency of pushing the metrics from the client. This is done with a new utility tool called kafka-client-metrics.sh that can be found in the /bin folder of your Kafka distribution.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
λ riferrei ~ → kafka-client-metrics.sh

This tool helps to manipulate and describe client metrics configurations.

Option Description
------ -----------
--alter Alter the configuration for the client
metrics resource.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client.
--delete Delete the configuration for the
client metrics resource.
--describe List configurations for the client
metrics resource.
--generate-name Generate a UUID to use as the name.
--help Print usage information.
--interval <Integer: push interval> The metrics push interval in
milliseconds.
--list List the client metrics resources.
--match <String: k1=v1,k2=v2> Matching selector 'k1=v1,k2=v2'. The
following is a list of valid
selector names:
client_id
client_instance_id
client_software_name
client_software_version
client_source_address
client_source_port
--metrics <String: m1,m2> Telemetry metric name prefixes 'm1,m2'.
--name <String: name> Name of client metrics configuration
resource.
--version Display Kafka version.
For example, to enable the collection of virtually all producer and consumer metrics from your clients, and push them on every second, you can use the following command.
1
2
3
$KAFKA_HOME/bin/kafka-client-metrics.sh --bootstrap-server $BOOTSTRAP_SERVER \\
--metrics org.apache.kafka.producer.,org.apache.kafka.consumer. \\
--alter --generate-name --interval 1000
Once the client metrics subscription is created, the metrics will be made available via a configured metrics reporter. Unfortunately, KIP-714 doesn't include any generic implementation that you can use for your metrics reporter. This is the part you will need to get your hands dirty and write some Java code. But trust me, this isn't any rocket science. As long as you follow the API specifications, you are going to be fine.
A metrics reporter is a Java class that implements org.apache.kafka.common.metrics.MetricsReporter interface. This is how this interface looks like.
1
2
3
4
5
6
7
8
9
10
11
12
13
public interface MetricsReporter extends Reconfigurable, AutoCloseable {

void init(List<KafkaMetric> metrics);

void configure(Map<String, ?> configs);

void metricChange(KafkaMetric metric);

void metricRemoval(KafkaMetric metric);

void close();

}
As you can see, all you need to do is implement a set of callback methods that serve the purpose of giving you a way to react to when a metric value changes or is removed. What you are going to do with the values next is up to you. But for the sake of implementing a metric reporter for your Kafka cluster, you don't need to get any fancy. Ideally, most of the implementation focuses on the creation of the resources required to send the metrics out, and you can leverage the callback methods init(), configure(), and close() for this purpose.
💡 Here is an example of a Java class implementing the MetricsReporter interface.
Implementing the MetricsReporter interface is the first step. This is your way to extend Kafka's plugin framework to implement a reporter. However, to allow your Kafka brokers to receive metrics from the clients, your code also needs the org.apache.kafka.server.telemetry.ClientTelemetry interface.
1
2
3
4
5
public interface ClientTelemetry {

ClientTelemetryReceiver clientReceiver();

}
This interface allows you to provide to the metrics reporter an instance of a ClientTelemetryReceiver. This is the component that will process the metrics serialized in the OpenTelemetry metrics format and send to a compatible endpoint.
1
2
3
4
5
public interface ClientTelemetryReceiver {

void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload);

}
You must design the implementation of the ClientTelemetryReceiver to be as lightweight as possible. Chances are, this component will be cached by the Kafka broker and reused across several invocations. Moreover, sending metrics to the endpoint must use non-blocking APIs to ensure that Kafka won't end up with busy threads.
💡 Here is an example of a Java class implementing the ClientTelemetryReceiver interface.
Once you finish your metrics reporter implementation, package your code into a JAR file and make this file available on Kafka's classpath. Add the file into the $KAFKA_HOME/libs folder. You must also inform Kafka which metrics reporter to use. You can do this by setting the property metrics.reporter from the Kafka broker. Here is how you can do this via environment variables.
1
export KAFKA_METRIC_REPORTERS=com.riferrei.kafka.KIP714MetricReporter
This is it. I know it may seem that this was a lot of groundwork to have things working. But the good news is that you only need to implement this once. The same implementation can be reused across different Kafka clusters.

Shipping metrics to CloudWatch

Enough with the theory. Let's have you experimenting with KIP-714 with a hands-on demo. This demo contains an example of implementation of the KIP-714 where the collected client metrics are pushed into an OpenTelemetry collector, which in turn sends the metrics to Amazon CloudWatch. Everything in this demo is implemented using Docker, and the necessary setup is implemented for you behind the scenes. All you need to do is start the containers and generate some traffic.
To get things started, you must have the following pre-requisites installed on your computer.
Once you have everything installed, you can clone the repository that contains the code for this demo. Use the following command.
1
git clone https://github.com/riferrei/kafka-client-metrics-to-cloudwatch-with-kip-714.git
Change to the directory that contains the code.
1
cd kafka-client-metrics-to-cloudwatch-with-kip-714
Before you proceed, you must configure your AWS credentials using the AWS CLI. This is required because the code will try to push the collected client metrics to Amazon CloudWatch, and for this to happen, the code needs to authenticate with the service. The code will try to use the AWS credentials stored in your ~/.aws directory for the default profile. Follow the instructions detailed here to create your own set of credentials.
Now you can start the containers from this demo. The current directory contains a Docker Compose file that defines two containers: one called kafka that is your Kafka broker. The other one is called collector which is the OpenTelemetry collector functioning as a side-car to push the collected client metrics to the observability backend. Start these containers with the following command.
1
docker compose up -d
It may take several minutes for Docker to download the required images before starting the containers. But once that happens, you won't see nothing exciting going on. But trust me—there will be plenty going on behind the scenes. At this point, you will have your Kafka broker up and running, and with a topic named load-test already created. This is going to be the topic you will use to verify if the client metrics are being collected. Moreover, the Kafka broker will be already configured to allow for the collection of both producer and consumer metrics. All you need to do to make this happen is write and read records to/from Kafka.
Let's start by writing 50K records into the topic load-test. You don't need to write any client producer application for this. Kafka includes the kafka-producer-perf-test.sh utility tool exactly for this purpose. To load 50K records, with each record containing 1KB and sending 1000 records every second, use the following command.
1
kafka-producer-perf-test.sh --producer-props bootstrap.servers=localhost:9092 --throughput 1000 --num-records 50000 --record-size 1024 --topic load-test --print-metrics
Once this command completes, you should see an output similar to this one:
1
2
3
4
5
6
7
8
9
10
4999 records sent, 999.4 records/sec (0.98 MB/sec), 8.2 ms avg latency, 292.0 ms max latency.
5002 records sent, 1000.4 records/sec (0.98 MB/sec), 1.2 ms avg latency, 21.0 ms max latency.
5002 records sent, 999.2 records/sec (0.98 MB/sec), 1.1 ms avg latency, 11.0 ms max latency.
5007 records sent, 1000.4 records/sec (0.98 MB/sec), 1.2 ms avg latency, 19.0 ms max latency.
5007 records sent, 1000.8 records/sec (0.98 MB/sec), 1.4 ms avg latency, 14.0 ms max latency.
5002 records sent, 1000.2 records/sec (0.98 MB/sec), 1.5 ms avg latency, 21.0 ms max latency.
4993 records sent, 998.2 records/sec (0.97 MB/sec), 1.6 ms avg latency, 22.0 ms max latency.
5013 records sent, 1002.2 records/sec (0.98 MB/sec), 1.6 ms avg latency, 13.0 ms max latency.
5002 records sent, 1000.2 records/sec (0.98 MB/sec), 1.5 ms avg latency, 7.0 ms max latency.
50000 records sent, 999.880014 records/sec (0.98 MB/sec), 2.10 ms avg latency, 292.00 ms max latency, 1 ms 50th, 4 ms 95th, 10 ms 99th, 146 ms 99.9th.
Followed by the producer client metrics and their values.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
Metric Name Value
app-info:commit-id:{client-id=perf-producer-client} : 2ae524ed625438c5
app-info:start-time-ms:{client-id=perf-producer-client} : 1719256696165
app-info:version:{client-id=perf-producer-client} : 3.7.0
kafka-metrics-count:count:{client-id=perf-producer-client} : 112.000
producer-metrics:batch-size-avg:{client-id=perf-producer-client} : 2478.732
producer-metrics:batch-size-max:{client-id=perf-producer-client} : 15556.000
producer-metrics:batch-split-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:batch-split-total:{client-id=perf-producer-client} : 0.000
producer-metrics:buffer-available-bytes:{client-id=perf-producer-client} : 33554432.000
producer-metrics:buffer-exhausted-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:buffer-exhausted-total:{client-id=perf-producer-client} : 0.000
producer-metrics:buffer-total-bytes:{client-id=perf-producer-client} : 33554432.000
producer-metrics:bufferpool-wait-ratio:{client-id=perf-producer-client} : 0.000
producer-metrics:bufferpool-wait-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:bufferpool-wait-time-total:{client-id=perf-producer-client} : 0.000
producer-metrics:compression-rate-avg:{client-id=perf-producer-client} : 1.000
producer-metrics:connection-close-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:connection-close-total:{client-id=perf-producer-client} : 0.000
producer-metrics:connection-count:{client-id=perf-producer-client} : 2.000
producer-metrics:connection-creation-rate:{client-id=perf-producer-client} : 0.040
producer-metrics:connection-creation-total:{client-id=perf-producer-client} : 2.000
producer-metrics:failed-authentication-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:failed-authentication-total:{client-id=perf-producer-client} : 0.000
producer-metrics:failed-reauthentication-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:failed-reauthentication-total:{client-id=perf-producer-client} : 0.000
producer-metrics:flush-time-ns-total:{client-id=perf-producer-client} : 1757375.000
producer-metrics:incoming-byte-rate:{client-id=perf-producer-client} : 25718.753
producer-metrics:incoming-byte-total:{client-id=perf-producer-client} : 1283803.000
producer-metrics:io-ratio:{client-id=perf-producer-client} : 0.052
producer-metrics:io-time-ns-avg:{client-id=perf-producer-client} : 42781.231
producer-metrics:io-time-ns-total:{client-id=perf-producer-client} : 2614831638.000
producer-metrics:io-wait-ratio:{client-id=perf-producer-client} : 0.897
producer-metrics:io-wait-time-ns-avg:{client-id=perf-producer-client} : 732721.341
producer-metrics:io-wait-time-ns-total:{client-id=perf-producer-client} : 44784661091.000
producer-metrics:io-waittime-total:{client-id=perf-producer-client} : 44784661091.000
producer-metrics:iotime-total:{client-id=perf-producer-client} : 2614831638.000
producer-metrics:metadata-age:{client-id=perf-producer-client} : 49.888
producer-metrics:metadata-wait-time-ns-total:{client-id=perf-producer-client} : 123723791.000
producer-metrics:network-io-rate:{client-id=perf-producer-client} : 858.070
producer-metrics:network-io-total:{client-id=perf-producer-client} : 42834.000
producer-metrics:outgoing-byte-rate:{client-id=perf-producer-client} : 1097380.516
producer-metrics:outgoing-byte-total:{client-id=perf-producer-client} : 54780138.000
producer-metrics:produce-throttle-time-avg:{client-id=perf-producer-client} : 0.000
producer-metrics:produce-throttle-time-max:{client-id=perf-producer-client} : 0.000
producer-metrics:reauthentication-latency-avg:{client-id=perf-producer-client} : NaN
producer-metrics:reauthentication-latency-max:{client-id=perf-producer-client} : NaN
producer-metrics:record-error-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:record-error-total:{client-id=perf-producer-client} : 0.000
producer-metrics:record-queue-time-avg:{client-id=perf-producer-client} : 0.074
producer-metrics:record-queue-time-max:{client-id=perf-producer-client} : 23.000
producer-metrics:record-retry-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:record-retry-total:{client-id=perf-producer-client} : 0.000
producer-metrics:record-send-rate:{client-id=perf-producer-client} : 1002.587
producer-metrics:record-send-total:{client-id=perf-producer-client} : 50000.000
producer-metrics:record-size-avg:{client-id=perf-producer-client} : 1110.000
producer-metrics:record-size-max:{client-id=perf-producer-client} : 1110.000
producer-metrics:records-per-request-avg:{client-id=perf-producer-client} : 2.340
producer-metrics:request-latency-avg:{client-id=perf-producer-client} : 1.458
producer-metrics:request-latency-max:{client-id=perf-producer-client} : 25.000
producer-metrics:request-rate:{client-id=perf-producer-client} : 429.035
producer-metrics:request-size-avg:{client-id=perf-producer-client} : 2557.788
producer-metrics:request-size-max:{client-id=perf-producer-client} : 15619.000
producer-metrics:request-total:{client-id=perf-producer-client} : 21417.000
producer-metrics:requests-in-flight:{client-id=perf-producer-client} : 0.000
producer-metrics:response-rate:{client-id=perf-producer-client} : 429.052
producer-metrics:response-total:{client-id=perf-producer-client} : 21417.000
producer-metrics:select-rate:{client-id=perf-producer-client} : 1224.109
producer-metrics:select-total:{client-id=perf-producer-client} : 61121.000
producer-metrics:successful-authentication-no-reauth-total:{client-id=perf-producer-client} : 0.000
producer-metrics:successful-authentication-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:successful-authentication-total:{client-id=perf-producer-client} : 0.000
producer-metrics:successful-reauthentication-rate:{client-id=perf-producer-client} : 0.000
producer-metrics:successful-reauthentication-total:{client-id=perf-producer-client} : 0.000
producer-metrics:txn-abort-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:txn-begin-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:txn-commit-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:txn-init-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:txn-send-offsets-time-ns-total:{client-id=perf-producer-client} : 0.000
producer-metrics:waiting-threads:{client-id=perf-producer-client} : 0.000
producer-node-metrics:incoming-byte-rate:{client-id=perf-producer-client, node-id=node--1} : 30.731
producer-node-metrics:incoming-byte-rate:{client-id=perf-producer-client, node-id=node-1} : 25709.654
producer-node-metrics:incoming-byte-total:{client-id=perf-producer-client, node-id=node--1} : 1534.000
producer-node-metrics:incoming-byte-total:{client-id=perf-producer-client, node-id=node-1} : 1282269.000
producer-node-metrics:outgoing-byte-rate:{client-id=perf-producer-client, node-id=node--1} : 9636.932
producer-node-metrics:outgoing-byte-rate:{client-id=perf-producer-client, node-id=node-1} : 1088659.542
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node--1} : 481066.000
producer-node-metrics:outgoing-byte-total:{client-id=perf-producer-client, node-id=node-1} : 54299072.000
producer-node-metrics:request-latency-avg:{client-id=perf-producer-client, node-id=node--1} : NaN
producer-node-metrics:request-latency-avg:{client-id=perf-producer-client, node-id=node-1} : 1.458
producer-node-metrics:request-latency-max:{client-id=perf-producer-client, node-id=node--1} : NaN
producer-node-metrics:request-latency-max:{client-id=perf-producer-client, node-id=node-1} : 25.000
producer-node-metrics:request-rate:{client-id=perf-producer-client, node-id=node--1} : 1.062
producer-node-metrics:request-rate:{client-id=perf-producer-client, node-id=node-1} : 428.334
producer-node-metrics:request-size-avg:{client-id=perf-producer-client, node-id=node--1} : 9076.717
producer-node-metrics:request-size-avg:{client-id=perf-producer-client, node-id=node-1} : 2541.615
producer-node-metrics:request-size-max:{client-id=perf-producer-client, node-id=node--1} : 9813.000
producer-node-metrics:request-size-max:{client-id=perf-producer-client, node-id=node-1} : 15619.000
producer-node-metrics:request-total:{client-id=perf-producer-client, node-id=node--1} : 53.000
producer-node-metrics:request-total:{client-id=perf-producer-client, node-id=node-1} : 21364.000
producer-node-metrics:response-rate:{client-id=perf-producer-client, node-id=node--1} : 1.062
producer-node-metrics:response-rate:{client-id=perf-producer-client, node-id=node-1} : 428.351
producer-node-metrics:response-total:{client-id=perf-producer-client, node-id=node--1} : 53.000
producer-node-metrics:response-total:{client-id=perf-producer-client, node-id=node-1} : 21364.000
producer-topic-metrics:byte-rate:{client-id=perf-producer-client, topic=load-test} : 1061823.601
producer-topic-metrics:byte-total:{client-id=perf-producer-client, topic=load-test} : 52953143.000
producer-topic-metrics:compression-rate:{client-id=perf-producer-client, topic=load-test} : 1.000
producer-topic-metrics:record-error-rate:{client-id=perf-producer-client, topic=load-test} : 0.000
producer-topic-metrics:record-error-total:{client-id=perf-producer-client, topic=load-test} : 0.000
producer-topic-metrics:record-retry-rate:{client-id=perf-producer-client, topic=load-test} : 0.000
producer-topic-metrics:record-retry-total:{client-id=perf-producer-client, topic=load-test} : 0.000
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=load-test} : 1002.607
producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=load-test} : 50000.000
Now that records were written on the topic, you can consume them. Kafka also includes the kafka-consumer-perf-test.sh utility tool for this purpose. To read the 50K records, use the following command.
1
kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --messages 50000 --topic load-test --print-metrics
Once this command completes, you should see an output similar to this one:
1
2
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2024-06-24 15:24:32:294, 2024-06-24 15:24:33:084, 48.8281, 61.8078, 50000, 63291.1392, 484, 306, 159.5690, 163398.6928
Followed by the consumer client metrics and their values.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
Metric Name Value
consumer-coordinator-metrics:assigned-partitions:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:commit-latency-avg:{client-id=perf-consumer-client} : 10.000
consumer-coordinator-metrics:commit-latency-max:{client-id=perf-consumer-client} : 10.000
consumer-coordinator-metrics:commit-rate:{client-id=perf-consumer-client} : 0.033
consumer-coordinator-metrics:commit-total:{client-id=perf-consumer-client} : 1.000
consumer-coordinator-metrics:failed-rebalance-rate-per-hour:{client-id=perf-consumer-client} : 347.759
consumer-coordinator-metrics:failed-rebalance-total:{client-id=perf-consumer-client} : 3.000
consumer-coordinator-metrics:heartbeat-rate:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:heartbeat-response-time-max:{client-id=perf-consumer-client} : NaN
consumer-coordinator-metrics:heartbeat-total:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:join-rate:{client-id=perf-consumer-client} : 0.032
consumer-coordinator-metrics:join-time-avg:{client-id=perf-consumer-client} : 9.000
consumer-coordinator-metrics:join-time-max:{client-id=perf-consumer-client} : 9.000
consumer-coordinator-metrics:join-total:{client-id=perf-consumer-client} : 1.000
consumer-coordinator-metrics:last-heartbeat-seconds-ago:{client-id=perf-consumer-client} : -1.000
consumer-coordinator-metrics:last-rebalance-seconds-ago:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:partition-assigned-latency-avg:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:partition-assigned-latency-max:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:partition-lost-latency-avg:{client-id=perf-consumer-client} : NaN
consumer-coordinator-metrics:partition-lost-latency-max:{client-id=perf-consumer-client} : NaN
consumer-coordinator-metrics:partition-revoked-latency-avg:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:partition-revoked-latency-max:{client-id=perf-consumer-client} : 0.000
consumer-coordinator-metrics:rebalance-latency-avg:{client-id=perf-consumer-client} : 241.000
consumer-coordinator-metrics:rebalance-latency-max:{client-id=perf-consumer-client} : 241.000
consumer-coordinator-metrics:rebalance-latency-total:{client-id=perf-consumer-client} : 241.000
consumer-coordinator-metrics:rebalance-rate-per-hour:{client-id=perf-consumer-client} : 116.788
consumer-coordinator-metrics:rebalance-total:{client-id=perf-consumer-client} : 1.000
consumer-coordinator-metrics:sync-rate:{client-id=perf-consumer-client} : 0.032
consumer-coordinator-metrics:sync-time-avg:{client-id=perf-consumer-client} : 11.000
consumer-coordinator-metrics:sync-time-max:{client-id=perf-consumer-client} : 11.000
consumer-coordinator-metrics:sync-total:{client-id=perf-consumer-client} : 1.000
consumer-fetch-manager-metrics:bytes-consumed-rate:{client-id=perf-consumer-client, topic=load-test} : 1678364.853
consumer-fetch-manager-metrics:bytes-consumed-rate:{client-id=perf-consumer-client} : 1678364.853
consumer-fetch-manager-metrics:bytes-consumed-total:{client-id=perf-consumer-client, topic=load-test} : 51650000.000
consumer-fetch-manager-metrics:bytes-consumed-total:{client-id=perf-consumer-client} : 51650000.000
consumer-fetch-manager-metrics:fetch-latency-avg:{client-id=perf-consumer-client} : 14.385
consumer-fetch-manager-metrics:fetch-latency-max:{client-id=perf-consumer-client} : 508.000
consumer-fetch-manager-metrics:fetch-rate:{client-id=perf-consumer-client} : 1.689
consumer-fetch-manager-metrics:fetch-size-avg:{client-id=perf-consumer-client, topic=load-test} : 1012745.098
consumer-fetch-manager-metrics:fetch-size-avg:{client-id=perf-consumer-client} : 1012745.098
consumer-fetch-manager-metrics:fetch-size-max:{client-id=perf-consumer-client, topic=load-test} : 1029901.000
consumer-fetch-manager-metrics:fetch-size-max:{client-id=perf-consumer-client} : 1029901.000
consumer-fetch-manager-metrics:fetch-throttle-time-avg:{client-id=perf-consumer-client} : 0.000
consumer-fetch-manager-metrics:fetch-throttle-time-max:{client-id=perf-consumer-client} : 0.000
consumer-fetch-manager-metrics:fetch-total:{client-id=perf-consumer-client} : 52.000
consumer-fetch-manager-metrics:preferred-read-replica:{client-id=perf-consumer-client, topic=load-test, partition=0} : -1
consumer-fetch-manager-metrics:records-consumed-rate:{client-id=perf-consumer-client, topic=load-test} : 1624.748
consumer-fetch-manager-metrics:records-consumed-rate:{client-id=perf-consumer-client} : 1624.695
consumer-fetch-manager-metrics:records-consumed-total:{client-id=perf-consumer-client, topic=load-test} : 50000.000
consumer-fetch-manager-metrics:records-consumed-total:{client-id=perf-consumer-client} : 50000.000
consumer-fetch-manager-metrics:records-lag-avg:{client-id=perf-consumer-client, topic=load-test, partition=0} : 24515.608
consumer-fetch-manager-metrics:records-lag-max:{client-id=perf-consumer-client, topic=load-test, partition=0} : 49500.000
consumer-fetch-manager-metrics:records-lag-max:{client-id=perf-consumer-client} : 49500.000
consumer-fetch-manager-metrics:records-lag:{client-id=perf-consumer-client, topic=load-test, partition=0} : 0.000
consumer-fetch-manager-metrics:records-lead-avg:{client-id=perf-consumer-client, topic=load-test, partition=0} : 25484.392
consumer-fetch-manager-metrics:records-lead-min:{client-id=perf-consumer-client, topic=load-test, partition=0} : 500.000
consumer-fetch-manager-metrics:records-lead-min:{client-id=perf-consumer-client} : 500.000
consumer-fetch-manager-metrics:records-lead:{client-id=perf-consumer-client, topic=load-test, partition=0} : 50000.000
consumer-fetch-manager-metrics:records-per-request-avg:{client-id=perf-consumer-client, topic=load-test} : 980.392
consumer-fetch-manager-metrics:records-per-request-avg:{client-id=perf-consumer-client} : 980.392
kafka-metrics-count:count:{client-id=perf-consumer-client} : 61.000
Done. At this point, both producer and consumer metrics were collected from the clients and pushed to Amazon CloudWatch. If you have your AWS CLI properly configured, you can check this by listing all the metrics from the namespace kafka-kip-714. Note that the OpenTelemetry collector from this demo is configured to send all the metrics to the us-east-1 region.
1
aws cloudwatch list-metrics --region us-east-1 --namespace kafka-kip-714 --output json | jq '.Metrics | .[] | "\(.MetricName)"' | sort
You should see the following output:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"org.apache.kafka.consumer.commit.sync.time.ns.total"
"org.apache.kafka.consumer.committed.time.ns.total"
"org.apache.kafka.consumer.connection.close.rate"
"org.apache.kafka.consumer.connection.close.total"
"org.apache.kafka.consumer.connection.count"
"org.apache.kafka.consumer.connection.creation.rate"
"org.apache.kafka.consumer.connection.creation.total"
"org.apache.kafka.consumer.coordinator.assigned.partitions"
"org.apache.kafka.consumer.coordinator.commit.latency.avg"
"org.apache.kafka.consumer.coordinator.commit.latency.max"
"org.apache.kafka.consumer.coordinator.commit.rate"
"org.apache.kafka.consumer.coordinator.commit.total"
"org.apache.kafka.consumer.coordinator.failed.rebalance.rate.per.hour"
"org.apache.kafka.consumer.coordinator.failed.rebalance.total"
"org.apache.kafka.consumer.coordinator.heartbeat.rate"
"org.apache.kafka.consumer.coordinator.heartbeat.response.time.max"
"org.apache.kafka.consumer.coordinator.heartbeat.total"
"org.apache.kafka.consumer.coordinator.join.rate"
"org.apache.kafka.consumer.coordinator.join.time.avg"
"org.apache.kafka.consumer.coordinator.join.time.max"
"org.apache.kafka.consumer.coordinator.join.total"
"org.apache.kafka.consumer.coordinator.last.heartbeat.seconds.ago"
"org.apache.kafka.consumer.coordinator.last.rebalance.seconds.ago"
"org.apache.kafka.consumer.coordinator.partition.assigned.latency.avg"
"org.apache.kafka.consumer.coordinator.partition.assigned.latency.max"
"org.apache.kafka.consumer.coordinator.rebalance.latency.avg"
"org.apache.kafka.consumer.coordinator.rebalance.latency.max"
"org.apache.kafka.consumer.coordinator.rebalance.latency.total"
"org.apache.kafka.consumer.coordinator.rebalance.rate.per.hour"
"org.apache.kafka.consumer.coordinator.rebalance.total"
"org.apache.kafka.consumer.coordinator.sync.rate"
"org.apache.kafka.consumer.coordinator.sync.time.avg"
"org.apache.kafka.consumer.coordinator.sync.time.max"
"org.apache.kafka.consumer.coordinator.sync.total"
"org.apache.kafka.consumer.failed.authentication.rate"
"org.apache.kafka.consumer.failed.authentication.total"
"org.apache.kafka.consumer.failed.reauthentication.rate"
"org.apache.kafka.consumer.failed.reauthentication.total"
"org.apache.kafka.consumer.fetch.manager.bytes.consumed.rate"
"org.apache.kafka.consumer.fetch.manager.bytes.consumed.rate"
"org.apache.kafka.consumer.fetch.manager.bytes.consumed.total"
"org.apache.kafka.consumer.fetch.manager.bytes.consumed.total"
"org.apache.kafka.consumer.fetch.manager.fetch.latency.avg"
"org.apache.kafka.consumer.fetch.manager.fetch.latency.max"
"org.apache.kafka.consumer.fetch.manager.fetch.rate"
"org.apache.kafka.consumer.fetch.manager.fetch.size.avg"
"org.apache.kafka.consumer.fetch.manager.fetch.size.avg"
"org.apache.kafka.consumer.fetch.manager.fetch.size.max"
"org.apache.kafka.consumer.fetch.manager.fetch.size.max"
"org.apache.kafka.consumer.fetch.manager.fetch.throttle.time.avg"
"org.apache.kafka.consumer.fetch.manager.fetch.throttle.time.max"
"org.apache.kafka.consumer.fetch.manager.fetch.total"
"org.apache.kafka.consumer.fetch.manager.preferred.read.replica"
"org.apache.kafka.consumer.fetch.manager.preferred.read.replica"
"org.apache.kafka.consumer.fetch.manager.preferred.read.replica"
"org.apache.kafka.consumer.fetch.manager.preferred.read.replica"
"org.apache.kafka.consumer.fetch.manager.records.consumed.rate"
"org.apache.kafka.consumer.fetch.manager.records.consumed.rate"
"org.apache.kafka.consumer.fetch.manager.records.consumed.total"
"org.apache.kafka.consumer.fetch.manager.records.consumed.total"
"org.apache.kafka.consumer.fetch.manager.records.lag"
"org.apache.kafka.consumer.fetch.manager.records.lag"
"org.apache.kafka.consumer.fetch.manager.records.lag"
"org.apache.kafka.consumer.fetch.manager.records.lag"
"org.apache.kafka.consumer.fetch.manager.records.lag.avg"
"org.apache.kafka.consumer.fetch.manager.records.lag.avg"
"org.apache.kafka.consumer.fetch.manager.records.lag.avg"
"org.apache.kafka.consumer.fetch.manager.records.lag.avg"
"org.apache.kafka.consumer.fetch.manager.records.lag.max"
"org.apache.kafka.consumer.fetch.manager.records.lag.max"
"org.apache.kafka.consumer.fetch.manager.records.lag.max"
"org.apache.kafka.consumer.fetch.manager.records.lag.max"
"org.apache.kafka.consumer.fetch.manager.records.lead"
"org.apache.kafka.consumer.fetch.manager.records.lead"
"org.apache.kafka.consumer.fetch.manager.records.lead"
"org.apache.kafka.consumer.fetch.manager.records.lead"
"org.apache.kafka.consumer.fetch.manager.records.lead.avg"
"org.apache.kafka.consumer.fetch.manager.records.lead.avg"
"org.apache.kafka.consumer.fetch.manager.records.lead.avg"
"org.apache.kafka.consumer.fetch.manager.records.lead.avg"
"org.apache.kafka.consumer.fetch.manager.records.lead.min"
"org.apache.kafka.consumer.fetch.manager.records.lead.min"
"org.apache.kafka.consumer.fetch.manager.records.lead.min"
"org.apache.kafka.consumer.fetch.manager.records.lead.min"
"org.apache.kafka.consumer.fetch.manager.records.per.request.avg"
"org.apache.kafka.consumer.fetch.manager.records.per.request.avg"
"org.apache.kafka.consumer.incoming.byte.rate"
"org.apache.kafka.consumer.incoming.byte.total"
"org.apache.kafka.consumer.io.ratio"
"org.apache.kafka.consumer.io.time.ns.avg"
"org.apache.kafka.consumer.io.time.ns.total"
"org.apache.kafka.consumer.io.wait.ratio"
"org.apache.kafka.consumer.io.wait.time.ns.avg"
"org.apache.kafka.consumer.io.wait.time.ns.total"
"org.apache.kafka.consumer.io.waittime.total"
"org.apache.kafka.consumer.iotime.total"
"org.apache.kafka.consumer.last.poll.seconds.ago"
"org.apache.kafka.consumer.network.io.rate"
"org.apache.kafka.consumer.network.io.total"
"org.apache.kafka.consumer.node.incoming.byte.rate"
"org.apache.kafka.consumer.node.incoming.byte.rate"
"org.apache.kafka.consumer.node.incoming.byte.rate"
"org.apache.kafka.consumer.node.incoming.byte.rate"
"org.apache.kafka.consumer.node.incoming.byte.total"
"org.apache.kafka.consumer.node.incoming.byte.total"
"org.apache.kafka.consumer.node.incoming.byte.total"
"org.apache.kafka.consumer.node.incoming.byte.total"
"org.apache.kafka.consumer.node.outgoing.byte.rate"
"org.apache.kafka.consumer.node.outgoing.byte.rate"
"org.apache.kafka.consumer.node.outgoing.byte.rate"
"org.apache.kafka.consumer.node.outgoing.byte.rate"
"org.apache.kafka.consumer.node.outgoing.byte.total"
"org.apache.kafka.consumer.node.outgoing.byte.total"
"org.apache.kafka.consumer.node.outgoing.byte.total"
"org.apache.kafka.consumer.node.outgoing.byte.total"
"org.apache.kafka.consumer.node.request.rate"
"org.apache.kafka.consumer.node.request.rate"
"org.apache.kafka.consumer.node.request.rate"
"org.apache.kafka.consumer.node.request.rate"
"org.apache.kafka.consumer.node.request.size.avg"
"org.apache.kafka.consumer.node.request.size.avg"
"org.apache.kafka.consumer.node.request.size.avg"
"org.apache.kafka.consumer.node.request.size.avg"
"org.apache.kafka.consumer.node.request.size.max"
"org.apache.kafka.consumer.node.request.size.max"
"org.apache.kafka.consumer.node.request.size.max"
"org.apache.kafka.consumer.node.request.size.max"
"org.apache.kafka.consumer.node.request.total"
"org.apache.kafka.consumer.node.request.total"
"org.apache.kafka.consumer.node.request.total"
"org.apache.kafka.consumer.node.request.total"
"org.apache.kafka.consumer.node.response.rate"
"org.apache.kafka.consumer.node.response.rate"
"org.apache.kafka.consumer.node.response.rate"
"org.apache.kafka.consumer.node.response.rate"
"org.apache.kafka.consumer.node.response.total"
"org.apache.kafka.consumer.node.response.total"
"org.apache.kafka.consumer.node.response.total"
"org.apache.kafka.consumer.node.response.total"
"org.apache.kafka.consumer.outgoing.byte.rate"
"org.apache.kafka.consumer.outgoing.byte.total"
"org.apache.kafka.consumer.poll.idle.ratio.avg"
"org.apache.kafka.consumer.request.rate"
"org.apache.kafka.consumer.request.size.avg"
"org.apache.kafka.consumer.request.size.max"
"org.apache.kafka.consumer.request.total"
"org.apache.kafka.consumer.response.rate"
"org.apache.kafka.consumer.response.total"
"org.apache.kafka.consumer.select.rate"
"org.apache.kafka.consumer.select.total"
"org.apache.kafka.consumer.successful.authentication.no.reauth.total"
"org.apache.kafka.consumer.successful.authentication.rate"
"org.apache.kafka.consumer.successful.authentication.total"
"org.apache.kafka.consumer.successful.reauthentication.rate"
"org.apache.kafka.consumer.successful.reauthentication.total"
"org.apache.kafka.consumer.time.between.poll.avg"
"org.apache.kafka.consumer.time.between.poll.max"
"org.apache.kafka.producer.batch.size.avg"
"org.apache.kafka.producer.batch.size.max"
"org.apache.kafka.producer.batch.split.rate"
"org.apache.kafka.producer.batch.split.total"
"org.apache.kafka.producer.buffer.available.bytes"
"org.apache.kafka.producer.buffer.exhausted.rate"
"org.apache.kafka.producer.buffer.exhausted.total"
"org.apache.kafka.producer.buffer.total.bytes"
"org.apache.kafka.producer.bufferpool.wait.ratio"
"org.apache.kafka.producer.bufferpool.wait.time.ns.total"
"org.apache.kafka.producer.bufferpool.wait.time.total"
"org.apache.kafka.producer.compression.rate.avg"
"org.apache.kafka.producer.connection.close.rate"
"org.apache.kafka.producer.connection.close.total"
"org.apache.kafka.producer.connection.count"
"org.apache.kafka.producer.connection.creation.rate"
"org.apache.kafka.producer.connection.creation.total"
"org.apache.kafka.producer.failed.authentication.rate"
"org.apache.kafka.producer.failed.authentication.total"
"org.apache.kafka.producer.failed.reauthentication.rate"
"org.apache.kafka.producer.failed.reauthentication.total"
"org.apache.kafka.producer.flush.time.ns.total"
"org.apache.kafka.producer.incoming.byte.rate"
"org.apache.kafka.producer.incoming.byte.total"
"org.apache.kafka.producer.io.ratio"
"org.apache.kafka.producer.io.time.ns.avg"
"org.apache.kafka.producer.io.time.ns.total"
"org.apache.kafka.producer.io.wait.ratio"
"org.apache.kafka.producer.io.wait.time.ns.avg"
"org.apache.kafka.producer.io.wait.time.ns.total"
"org.apache.kafka.producer.io.waittime.total"
"org.apache.kafka.producer.iotime.total"
"org.apache.kafka.producer.metadata.age"
"org.apache.kafka.producer.metadata.wait.time.ns.total"
"org.apache.kafka.producer.network.io.rate"
"org.apache.kafka.producer.network.io.total"
"org.apache.kafka.producer.node.incoming.byte.rate"
"org.apache.kafka.producer.node.incoming.byte.rate"
"org.apache.kafka.producer.node.incoming.byte.rate"
"org.apache.kafka.producer.node.incoming.byte.total"
"org.apache.kafka.producer.node.incoming.byte.total"
"org.apache.kafka.producer.node.incoming.byte.total"
"org.apache.kafka.producer.node.outgoing.byte.rate"
"org.apache.kafka.producer.node.outgoing.byte.rate"
"org.apache.kafka.producer.node.outgoing.byte.rate"
"org.apache.kafka.producer.node.outgoing.byte.total"
"org.apache.kafka.producer.node.outgoing.byte.total"
"org.apache.kafka.producer.node.outgoing.byte.total"
"org.apache.kafka.producer.node.request.latency.avg"
"org.apache.kafka.producer.node.request.latency.avg"
"org.apache.kafka.producer.node.request.latency.max"
"org.apache.kafka.producer.node.request.latency.max"
"org.apache.kafka.producer.node.request.rate"
"org.apache.kafka.producer.node.request.rate"
"org.apache.kafka.producer.node.request.rate"
"org.apache.kafka.producer.node.request.size.avg"
"org.apache.kafka.producer.node.request.size.avg"
"org.apache.kafka.producer.node.request.size.avg"
"org.apache.kafka.producer.node.request.size.max"
"org.apache.kafka.producer.node.request.size.max"
"org.apache.kafka.producer.node.request.size.max"
"org.apache.kafka.producer.node.request.total"
"org.apache.kafka.producer.node.request.total"
"org.apache.kafka.producer.node.request.total"
"org.apache.kafka.producer.node.response.rate"
"org.apache.kafka.producer.node.response.rate"
"org.apache.kafka.producer.node.response.rate"
"org.apache.kafka.producer.node.response.total"
"org.apache.kafka.producer.node.response.total"
"org.apache.kafka.producer.node.response.total"
"org.apache.kafka.producer.outgoing.byte.rate"
"org.apache.kafka.producer.outgoing.byte.total"
"org.apache.kafka.producer.produce.throttle.time.avg"
"org.apache.kafka.producer.produce.throttle.time.max"
"org.apache.kafka.producer.record.error.rate"
"org.apache.kafka.producer.record.error.total"
"org.apache.kafka.producer.record.queue.time.avg"
"org.apache.kafka.producer.record.queue.time.max"
"org.apache.kafka.producer.record.retry.rate"
"org.apache.kafka.producer.record.retry.total"
"org.apache.kafka.producer.record.send.rate"
"org.apache.kafka.producer.record.send.total"
"org.apache.kafka.producer.record.size.avg"
"org.apache.kafka.producer.record.size.max"
"org.apache.kafka.producer.records.per.request.avg"
"org.apache.kafka.producer.request.latency.avg"
"org.apache.kafka.producer.request.latency.max"
"org.apache.kafka.producer.request.rate"
"org.apache.kafka.producer.request.size.avg"
"org.apache.kafka.producer.request.size.max"
"org.apache.kafka.producer.request.total"
"org.apache.kafka.producer.requests.in.flight"
"org.apache.kafka.producer.response.rate"
"org.apache.kafka.producer.response.total"
"org.apache.kafka.producer.select.rate"
"org.apache.kafka.producer.select.total"
"org.apache.kafka.producer.successful.authentication.no.reauth.total"
"org.apache.kafka.producer.successful.authentication.rate"
"org.apache.kafka.producer.successful.authentication.total"
"org.apache.kafka.producer.successful.reauthentication.rate"
"org.apache.kafka.producer.successful.reauthentication.total"
"org.apache.kafka.producer.topic.byte.rate"
"org.apache.kafka.producer.topic.byte.rate"
"org.apache.kafka.producer.topic.byte.total"
"org.apache.kafka.producer.topic.byte.total"
"org.apache.kafka.producer.topic.compression.rate"
"org.apache.kafka.producer.topic.compression.rate"
"org.apache.kafka.producer.topic.record.error.rate"
"org.apache.kafka.producer.topic.record.error.rate"
"org.apache.kafka.producer.topic.record.error.total"
"org.apache.kafka.producer.topic.record.error.total"
"org.apache.kafka.producer.topic.record.retry.rate"
"org.apache.kafka.producer.topic.record.retry.rate"
"org.apache.kafka.producer.topic.record.retry.total"
"org.apache.kafka.producer.topic.record.retry.total"
"org.apache.kafka.producer.topic.record.send.rate"
"org.apache.kafka.producer.topic.record.send.rate"
"org.apache.kafka.producer.topic.record.send.total"
"org.apache.kafka.producer.topic.record.send.total"
"org.apache.kafka.producer.txn.abort.time.ns.total"
"org.apache.kafka.producer.txn.begin.time.ns.total"
"org.apache.kafka.producer.txn.commit.time.ns.total"
"org.apache.kafka.producer.txn.init.time.ns.total"
"org.apache.kafka.producer.txn.send.offsets.time.ns.total"
"org.apache.kafka.producer.waiting.threads"
If you see the same output, it means the metrics were successfully stored at CloudWatch and you can use its visualization tools to play with them. In fact, let's do this right now. Open the AWS console and navigate to the CloudWatch service here.
Go to the left menu and click at All Metrics to open the metrics graph visualizer. You should see all the namespaces containing metrics from your AWS services and applications, including the kafka-kip-714 one.
Image not found
If you know your way into creating graph visualization for CloudWatch metrics, feel free to play with some of the metrics stored. But if you are new to CloudWatch and just want to see some nice graphs for the 50K records processed by Kafka, you can use the following code.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"metrics": [
[ "kafka-kip-714", "org.apache.kafka.producer.topic.record.send.total", "topic", "load-test" ],
[ "kafka-kip-714", "org.apache.kafka.consumer.fetch.manager.records.consumed.total", "topic", "load-test" ]
],
"view": "gauge",
"stacked": false,
"region": "us-east-1",
"yAxis": {
"left": {
"min": 0,
"max": 50000
}
},
"stat": "Sum",
"period": 300,
"liveData": true,
"setPeriodToTimeRange": false,
"sparkline": true,
"trend": true
}
To use this code, click on the Source option right below the metrics visualizer, and paste its entire content.
Image not found
After this, click in the Update button. The graph visualizer should update with the following result.
Image not found
These two metrics displayed as gauges show the number of records processed by Kafka. The first metric called org.apache.kafka.producer.topic.record.send.total represents the number of records written, and the second metric called org.apache.kafka.consumer.fetch.manager.records.consumed.total is the number of records read. Since all the metrics are available in a per-topic basis, you can create nice dashboards showing the details of each topic.

Summary

Your Kafka clients are as important as your Kafka clusters, and your observability strategy must treat them as symbionts. This is important because without proper visibility into Kafka client metrics, it will be hard to identify the culprit of performance problems. This blog post highlighted the importance of collecting both cluster and client level metrics, and how the KIP-714 helps with that.
It explained the design problems KIP-714 helps solve, such as the need for JMX and the avoidance of a per-client agent collector. With a much simpler metric collection architecture, KIP-714 allows you to quickly pull client metrics with a few changes. This blog post also highlighted what are those changes and how you can implement them in your own Kafka cluster.
All of this was shown in practice with a hands-on demo that shows how to send client metrics to Amazon CloudWatch. Since KIP-714 was built on top of OpenTelemetry metric standards, you can easily customize the demo to use your own observability backend.

About the Author: Ricardo Ferreira works for AWS in the developer relations team. You can follow him on LinkedIn, Twitter, and Instagram.
 

1 Comment

Log in to comment