KIP-714: Keep your Kafka Clusters Close, and your Kafka Clients Closer

KIP-714: Keep your Kafka Clusters Close, and your Kafka Clients Closer

For many years, the only way to capture Kafka client metrics was using JMX. Starting with Kafka 3.7.0 and, thanks to the KIP-714, it is now possible to pull client metrics from your Kafka clusters using OpenTelemetry. This blog post explains how KIP-714 works, what you have to do to collect metrics, and gives you a working code to test things out.

Ricardo Ferreira
Amazon Employee
Published Jun 25, 2024
Last Modified Jun 26, 2024

Overview

It is undeniable how successful Apache Kafka is as a technology. Despite being no longer alone in the streaming data space, Kafka remains one of the best options for scenarios requiring a distributed stream storage. But here is something that may get few people surprised: Kafka's success doesn't come exclusively from its brokers and cluster technology. Kafka's client APIs have an enormous weight in this too.
To properly understand this, it can help if you think about how other messaging technologies work. Their client APIs are essentially what the name implies. A way for client applications to communicate with the system and exchange messages. Whether you are talking about JMS, AMQP, MQTT; all of them follow the same design. It leads you to think that the key actor with the lead role in the architecture is the messaging system itself. However, with Kafka, things are different. Kafka's clustering protocol is heavily centered on the idea of the clients allowing messages to be produced and consumed, as well as in helping the system implement details such as fault-tolerance, availability, redundancy, and horizontal scaling.
Even without exchanging messages with the cluster, the clients are constantly taking care of Kafka's clustering details. Take a topic rebalancing as an example. Whenever the number of partitions of a topic changes, the consumer instances within a group must be reassigned, and that happens at the consumer side. Similarly, whoever is going to be the partition leader from the producer perspective happens at the producer side. For this reason, properly designing an observability strategy to your Kafka clusters must also include client metrics. With cluster metrics only, it is hard to identify the culprits of tricky performance problems. Worst, without looking at client metrics, you can't really pinpoint the root-cause of problems because very often they happen at the client level. Your Kafka clients are as important as your Kafka clusters, and your observability strategy must treat them as symbionts.
With the KIP-714, it is now possible to pull client metrics from your Kafka clusters using OpenTelemetry. It simplifies considerably the way you can implement observability. This blog explains how KIP-714 works, what you have to do to collect metrics, and gives you a working code to test things out.

Client monitoring problems

Since the early days of Kafka, client metrics was already been available. Kafka clients are written in Java, and therefore, they provide the ability to expose JMX metrics via the JVM. As long as your monitoring system can collect JMX metrics, you could retrieve the metrics and build your own monitoring system. However, this approach proved to be troublesome over the years.
For starters, in order to connect with a JMX-enabled system, you must have an agent monitoring system built on Java. In the early 00s that wouldn't be much of a problem, since we thought that Java would be the future of distributed computing. In that reality, any given Kafka client written in Java could easily process the metrics available in the JVM. Still in the same reality, if we wanted to outsource the metric collection to another system, chances are this system would be also implemented in Java and collecting the metrics was one JMX API call away.
With the advent of microservices, other programming languages became as relevant as Java. Cloud was also a mechanism of change. Modern programming languages designed to be as effective as Java for the cloud became mainstream. Considering this new reality, collecting metrics from JMX-enabled systems was no longer easy. In most cases, monitoring systems would need a Java agent that would collect the metrics from JMX; make this metrics available in some transient system, and allow this metrics to be retrieved via REST APIs. This creates a monitoring strategy that is fragile and hard to implement.
To complicate things ever further, client metrics are collected on a per-client basis. This means that each client application requires its own agent installed to expose metrics to the observability backend. What is the problem with this? You may ask. Well, the problem is scalability. Kafka wasn't designed to have just a few clients connected, producing and consuming messages. As a central nervous system that is designed to integrate virtually all systems from a given organization, you may expect to have at least a couple thousand clients per cluster. Installing and maintaining agents for each client is by itself a recipe for organizational chaos.
The need for multiple agents also means higher observability costs. Some observability vendors price their software per host monitored. As such, if you need to install an agent on every host that executes a Kafka client application, it will bring your observability costs to a prohibitive level. Suddenly, you start having to select only a few applications to monitor to decrease your costs, but in turn, you will also minimize your ability to identify the culprit of tricky performance problems.
Because of these client monitoring problems, KIP-714 was created.

How does KIP-714 work?

The idea behind KIP-714 is to allow client metrics to be pulled from a central location, and exposed to the outside world via the metrics plugin. You can use your own plugin to send the metrics to your observability backend, which will be available over the OpenTelemetry metrics serialization format. This design allows you to build flexible integration strategies; not tied to specific observability vendors, and extensible enough for you to evolve your strategy at any time. At a high-level, this is how KIP-714 looks like.
Client metrics collection is disabled by default. You need to explicitly enable the support for client metrics collection in your Kafka cluster, and specify which metrics you want to collect from the clients. Moreover, you need to specify the push interval, which controls the frequency of pushing the metrics from the client. This is done with a new utility tool called kafka-client-metrics.sh that can be found in the /bin folder of your Kafka distribution.
For example, to enable the collection of virtually all producer and consumer metrics from your clients, and push them on every second, you can use the following command.
Once the client metrics subscription is created, the metrics will be made available via a configured metrics reporter. Unfortunately, KIP-714 doesn't include any generic implementation that you can use for your metrics reporter. This is the part you will need to get your hands dirty and write some Java code. But trust me, this isn't any rocket science. As long as you follow the API specifications, you are going to be fine.
A metrics reporter is a Java class that implements org.apache.kafka.common.metrics.MetricsReporter interface. This is how this interface looks like.
As you can see, all you need to do is implement a set of callback methods that serve the purpose of giving you a way to react to when a metric value changes or is removed. What you are going to do with the values next is up to you. But for the sake of implementing a metric reporter for your Kafka cluster, you don't need to get any fancy. Ideally, most of the implementation focuses on the creation of the resources required to send the metrics out, and you can leverage the callback methods init(), configure(), and close() for this purpose.
💡 Here is an example of a Java class implementing the MetricsReporter interface.
Implementing the MetricsReporter interface is the first step. This is your way to extend Kafka's plugin framework to implement a reporter. However, to allow your Kafka brokers to receive metrics from the clients, your code also needs the org.apache.kafka.server.telemetry.ClientTelemetry interface.
This interface allows you to provide to the metrics reporter an instance of a ClientTelemetryReceiver. This is the component that will process the metrics serialized in the OpenTelemetry metrics format and send to a compatible endpoint.
You must design the implementation of the ClientTelemetryReceiver to be as lightweight as possible. Chances are, this component will be cached by the Kafka broker and reused across several invocations. Moreover, sending metrics to the endpoint must use non-blocking APIs to ensure that Kafka won't end up with busy threads.
💡 Here is an example of a Java class implementing the ClientTelemetryReceiver interface.
Once you finish your metrics reporter implementation, package your code into a JAR file and make this file available on Kafka's classpath. Add the file into the $KAFKA_HOME/libs folder. You must also inform Kafka which metrics reporter to use. You can do this by setting the property metrics.reporter from the Kafka broker. Here is how you can do this via environment variables.
This is it. I know it may seem that this was a lot of groundwork to have things working. But the good news is that you only need to implement this once. The same implementation can be reused across different Kafka clusters.

Shipping metrics to CloudWatch

Enough with the theory. Let's have you experimenting with KIP-714 with a hands-on demo. This demo contains an example of implementation of the KIP-714 where the collected client metrics are pushed into an OpenTelemetry collector, which in turn sends the metrics to Amazon CloudWatch. Everything in this demo is implemented using Docker, and the necessary setup is implemented for you behind the scenes. All you need to do is start the containers and generate some traffic.
To get things started, you must have the following pre-requisites installed on your computer.
Once you have everything installed, you can clone the repository that contains the code for this demo. Use the following command.
Change to the directory that contains the code.
Before you proceed, you must configure your AWS credentials using the AWS CLI. This is required because the code will try to push the collected client metrics to Amazon CloudWatch, and for this to happen, the code needs to authenticate with the service. The code will try to use the AWS credentials stored in your ~/.aws directory for the default profile. Follow the instructions detailed here to create your own set of credentials.
Now you can start the containers from this demo. The current directory contains a Docker Compose file that defines two containers: one called kafka that is your Kafka broker. The other one is called collector which is the OpenTelemetry collector functioning as a side-car to push the collected client metrics to the observability backend. Start these containers with the following command.
It may take several minutes for Docker to download the required images before starting the containers. But once that happens, you won't see nothing exciting going on. But trust me—there will be plenty going on behind the scenes. At this point, you will have your Kafka broker up and running, and with a topic named load-test already created. This is going to be the topic you will use to verify if the client metrics are being collected. Moreover, the Kafka broker will be already configured to allow for the collection of both producer and consumer metrics. All you need to do to make this happen is write and read records to/from Kafka.
Let's start by writing 50K records into the topic load-test. You don't need to write any client producer application for this. Kafka includes the kafka-producer-perf-test.sh utility tool exactly for this purpose. To load 50K records, with each record containing 1KB and sending 1000 records every second, use the following command.
Once this command completes, you should see an output similar to this one:
Followed by the producer client metrics and their values.
Now that records were written on the topic, you can consume them. Kafka also includes the kafka-consumer-perf-test.sh utility tool for this purpose. To read the 50K records, use the following command.
Once this command completes, you should see an output similar to this one:
Followed by the consumer client metrics and their values.
Done. At this point, both producer and consumer metrics were collected from the clients and pushed to Amazon CloudWatch. If you have your AWS CLI properly configured, you can check this by listing all the metrics from the namespace kafka-kip-714. Note that the OpenTelemetry collector from this demo is configured to send all the metrics to the us-east-1 region.
You should see the following output:
If you see the same output, it means the metrics were successfully stored at CloudWatch and you can use its visualization tools to play with them. In fact, let's do this right now. Open the AWS console and navigate to the CloudWatch service here.
Go to the left menu and click at All Metrics to open the metrics graph visualizer. You should see all the namespaces containing metrics from your AWS services and applications, including the kafka-kip-714 one.
If you know your way into creating graph visualization for CloudWatch metrics, feel free to play with some of the metrics stored. But if you are new to CloudWatch and just want to see some nice graphs for the 50K records processed by Kafka, you can use the following code.
To use this code, click on the Source option right below the metrics visualizer, and paste its entire content.
After this, click in the Update button. The graph visualizer should update with the following result.
These two metrics displayed as gauges show the number of records processed by Kafka. The first metric called org.apache.kafka.producer.topic.record.send.total represents the number of records written, and the second metric called org.apache.kafka.consumer.fetch.manager.records.consumed.total is the number of records read. Since all the metrics are available in a per-topic basis, you can create nice dashboards showing the details of each topic.

Summary

Your Kafka clients are as important as your Kafka clusters, and your observability strategy must treat them as symbionts. This is important because without proper visibility into Kafka client metrics, it will be hard to identify the culprit of performance problems. This blog post highlighted the importance of collecting both cluster and client level metrics, and how the KIP-714 helps with that.
It explained the design problems KIP-714 helps solve, such as the need for JMX and the avoidance of a per-client agent collector. With a much simpler metric collection architecture, KIP-714 allows you to quickly pull client metrics with a few changes. This blog post also highlighted what are those changes and how you can implement them in your own Kafka cluster.
All of this was shown in practice with a hands-on demo that shows how to send client metrics to Amazon CloudWatch. Since KIP-714 was built on top of OpenTelemetry metric standards, you can easily customize the demo to use your own observability backend.

About the Author: Ricardo Ferreira works for AWS in the developer relations team. You can follow him on LinkedIn, Twitter, and Instagram.
 

Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.

1 Comment