logo
Menu
Apache Kafka Partitions as Unit of Storage

Apache Kafka Partitions as Unit of Storage

Understanding how to manage Kafka's storage at the partition level

Ricardo Ferreira
Amazon Employee
Published Mar 19, 2024
Last Modified Mar 20, 2024

Part 3: Partitions as the unit-of-storage

Despite what most people think, Kafka doesn't store data into topics. Physically, topics are just folders from the broker's node used to organize the files that build up a partition on disk. Every new event written into a "topic" is actually written into a partition, which comprises multiple log segment files and their indexes. By default, each log segment file can grow to up to 1GB of size as you can read it here. When this 1GB water-mark is reached; Kafka flushes all events to disk and a new log segment file is created, along with its index file. New events are then appended to this new log segment file. For each log segment and index file, the broker has to create a file handle as read requests may need to be served. If you have worked with infrastructure before, you know how file handles affects scaling. Having multiple file handles open in a single node is surely a source of bottleneck.
To illustrate this bottleneck scenario, let's say you have a broker running on a node capable of creating 1024 file handles, which is the typical limit that Linux systems have per process. Then, you create a topic with only 1 partition. All partitions are created atomically, so once this topic is created, one partition will be active on that broker, with one log segment containing 2 file handles — one for the log segment file and another for its index. A producer writes data into this topic every minute, writing about 250MB of data per minute. This means that in 48 hours, this broker will run out of file handles if the producer continues to write data at that pace.
Surely, you could take different actions to remediate this bottleneck scenario, such decreasing the retention policy for the topic, executing multiple brokers in different processes, increasing the limit of file handles per process, and perhaps just adding more brokers in different nodes. But this scenario suits the need for your understanding that creating partitions arbitrarily has a direct impact on the amount of resources consumed by your Kafka brokers. If a topic with 1 partition could quickly exhaust the broker's node in less than 3 days, what would happen if the topic had more partitions? Any partition takes a toll on the brokers, because partitions, not topics, are your unit-of-storage.

Storage from the developer perspective

While all of this happens at an infrastructure level, it may be hard for you, as a developer, to think about partitions. Have you ever wondered why you never had to worry about partitions while writing and reading events with Kafka? Well, Kafka is a magnificent piece of technology when it comes to client APIs. From the producer standpoint, every time you invoke the send() function to write a new event, there is an internal process triggered that takes care of deciding which partition that event should go to. This process is known as partitioning, and it can be customized by a component called partitioner.
Here is how partitioning works. If the event produced has a partition assigned, then it will use it. Otherwise, if a custom partitioner was configured via the partitioner.class property, then it will execute this partitioner to compute which partition to use. If there is no custom partitioner configured, then Kafka tries to compute which partition to use based on the key assigned. The partitionForKey() method below describes how the key is used along with the number of available partitions.
1
2
3
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
If the event doesn't have a key or if the property partitioner.ignore.keys is configured to true, then Kafka will fallback to compute which partition to use based on factors such as broker load, the amount of data produced to each partition, etc. This partitioning behavior was introduced by KIP-794.
Consumers work similarly. Every time you invoke the poll() function from the consumer to read events, the events will be read from partitions selected by an internal process triggered beforehand that takes care of deciding how to assign the partitions to consumers. This process is known as assignment, and it can be customized by a component called assignor. To better understand how assignors work, you need to understand how Kafka handles consumers. All consumers must belong to a consumer group. This is the reason the group.id property in the consumer API is mandatory. Every group has a group coordinator, which oversees who joins and leaves the group.
Once one or more consumers join the group, the group coordinator executes the partition assignment by executing the assignors configured. This partition assignment can be triggered by discrete events. It may happen when a consumer dies, perhaps because of hardware or software malfunctioning. It may also happen if the consumer becomes so busy that it stops to respond to any heartbeat checking about its aliveness. And it may also happen if a brand new consumer joins the group. By the way, this assignment process is also known as rebalancing.
Consumers in Kafka come with a list of assignors configured. It defaults to the RangeAssignor, which tries to distribute the available partitions evenly amongst the range of consumers. But if there are more partitions than consumers, then the first consumers from the list (which are ordered in a lexicographic manner) will receive one extra partition each. The assign() method below from the RangeAssignor implementation describes how this works.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions)
{
Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);

Map<String, List<TopicPartition>> assignment = new HashMap&lt;&gt;();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<>());

for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List<MemberInfo> consumersForTopic = topicEntry.getValue();

Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;

Collections.sort(consumersForTopic);

int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
While the RangeAssignor does a good enough job distributing the partitions amongst the consumers, it doesn't provide any safety measures to prevent stop-the-world rebalances. This is a problem that prevents continuous processing of events, as the rebalance first needs to finish before the consumers resume their operations and fetch new events from the partitions. For this reason, if you're using newer Kafka clusters, the clients should prefer using the CooperativeStickyAssignor assignor. This assignor uses cooperative rebalancing to prevent stop-the-world pauses, and focus on moving only the partitions changing ownership, instead of all the partitions all over again. The assign() method below from the CooperativeStickyAssignor implementation describes how this works.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions)
{
Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic, subscriptions);

Map<TopicPartition, String> partitionsTransferringOwnership = super.partitionsTransferringOwnership == null ?
computePartitionsTransferringOwnership(subscriptions, assignments) :
super.partitionsTransferringOwnership;

adjustAssignment(assignments, partitionsTransferringOwnership);
return assignments;
}

private void adjustAssignment(Map<String, List<TopicPartition>> assignments,
Map<TopicPartition, String> partitionsTransferringOwnership)
{
for (Map.Entry<TopicPartition, String> partitionEntry : partitionsTransferringOwnership.entrySet()) {
assignments.get(partitionEntry.getValue()).remove(partitionEntry.getKey());
}
}

private Map<TopicPartition, String> computePartitionsTransferringOwnership(Map<String, Subscription> subscriptions,
Map<String, List<TopicPartition>> assignments)
{
Map<TopicPartition, String> allAddedPartitions = new HashMap<>();
Set<TopicPartition> allRevokedPartitions = new HashSet<>();

for (final Map.Entry<String, List<TopicPartition>> entry : assignments.entrySet()) {
String consumer = entry.getKey();

List<TopicPartition> ownedPartitions = subscriptions.get(consumer).ownedPartitions();
List<TopicPartition> assignedPartitions = entry.getValue();

Set<TopicPartition> ownedPartitionsSet = new HashSet<>(ownedPartitions);
for (TopicPartition tp : assignedPartitions) {
if (!ownedPartitionsSet.contains(tp))
allAddedPartitions.put(tp, consumer);
}

Set<TopicPartition> assignedPartitionsSet = new HashSet<>(assignedPartitions);
for (TopicPartition tp : ownedPartitions) {
if (!assignedPartitionsSet.contains(tp))
allRevokedPartitions.add(tp);
}
}

allAddedPartitions.keySet().retainAll(allRevokedPartitions);
return allAddedPartitions;
}
How your code writes events into partitions and reads from them is one of those processes that are so well encapsulated that developers rarely pay attention to it. It's the reason I call Kafka's client API a magnificent piece of technology. It makes you believe you are dealing with this high-level construct called topic, but in reality, the whole partition plumbing is being handled behind the scenes. This is even more true when you are working with integration platforms like Kafka Connect, or stream processing technologies such as Kafka Streams, ksqlDB, and Amazon Kinesis Data Analytics.

Storage from the Ops perspective

The way your data is distributed across the cluster is another way to see partitions as a unit-of-storage. To implement horizontal scalability, each broker from the cluster hosts one or multiple partitions. As you add new brokers to your cluster, you increase the storage capacity of the cluster to store events, as the total storage capacity of a cluster is dictated by the sum of all brokers' storage. With a cluster containing 4 brokers, each one with the storage capacity of 1TB, you can store up to 4TB of data. But how your data will leverage all this capacity depends directly on the partitions.
To illustrate this, let's say you start a cluster with 2 brokers. Once the cluster is up-and-running, you create a topic with 4 partitions. As mentioned before, partitions are created atomically, so after the topic creation, the partitions are distributed across the cluster. Here, each broker will host 2 partitions. With this setup ready, your producers can start writing events and they will be distributed over those partitions if no keys are used. Eventually, you may start noticing that the brokers are reaching their maximum disk capacity, and you don't plan to get rid of data. Your solution is to add 2 more brokers, so now you have a cluster with 4 brokers, doubling your storage capacity. The most likely symptom you may notice after this is that your clients will get slower. Why is this happening? They are supposed to get faster, not slower.
This happens because partitions are not automatically reassigned as brokers are added or removed to/from a cluster. So in this case, the partitions may still live in the 2 original brokers and, with more data coming in, they may become slow as they can start running out of disk space, out of file handles, out of memory, swapping frequently, etc. To resolve this problem, you have two choices. You can bounce the cluster so when it gets back, the partitions are reassigned, but this leads to cluster unavailability. The other option is forcing this reassign while the cluster is online using the kafka-reassign-partitions tool available in the /bin folder of your Kafka distribution. You can do this by first generating a reassignment recommendation given the new layout of your cluster.
1
kafka-reassign-partitions --bootstrap-server <BROKER_ENDPOINT> --broker-list 2,3 --topics-to-move-json-file partitions-to-reassign.json --generate
In this command, the property broker-list was set to 2,3, which correspond to the broker ids of the newly added brokers. The partitions-to-reassign.json file provided as a parameter is a file you must create yourself and it should contain information about which one or more partitions you intend to reassign. You should create this file using the following syntax:
1
2
3
4
5
6
7
8
{
"topics": [
{
"topic": "<TOPIC_NAME>"
}
],
"version": 1
}
Once the command completes, you should obtain a content similar to this:
1
2
3
4
5
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"test","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"test","partition":3,"replicas":[1],"log_dirs":["any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[3],"log_dirs":["any"]},{"topic":"test","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"test","partition":2,"replicas":[3],"log_dirs":["any"]},{"topic":"test","partition":3,"replicas":[2],"log_dirs":["any"]}]}
Save the contents of the proposed partition reassignment configuration into a new file. You may tweak the file with your own proposal, or just settle with what was recommended by Kafka. To execute the reassignment, use the command below:
1
kafka-reassign-partitions.sh --bootstrap-server <BROKER_ENDPOINT> --reassignment-json-file new-reassignment-file.json --execute
This command may take a while to complete if you have large partitions, depending on your network bandwidth, and surely how many partitions you are reassigning. Eventually, the partitions will become available across all the brokers you selected in the reassignment JSON file. Of course, assuming that the command didn't fail to complete. The point that matters here is that you won't leverage the full capacity of your cluster if you don't have your data distributed across the cluster, and this happens at a partition level.
 

Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.

Comments