logo
Menu
Apache Kafka Partitions as Unit of Durability

Apache Kafka Partitions as Unit of Durability

Learn what to consider when designing your Kafka replication strategy

Ricardo Ferreira
Amazon Employee
Published Mar 19, 2024

Part 4: Partitions as your unit-of-durability

Kafka as storage for your data streams wouldn't be very useful if it couldn't ensure data consistency. As with any distributed system, failures are a constant, and a well-designed architecture should consider failures happening any time. Luckily for us, Kafka's architecture includes durability built-in, with your data replicated across the cluster, and ultimately on each broker's disk.
Different from other messaging systems where replication is treated as a feature that is optionally enabled, in Kafka, replication is enabled by default. If you don't specify a replication factor while creating a topic, you create that topic with a replication factor of one. This means that for each partition of that topic, there will be one replica of it in another broker. As the name implies, the purpose of a replica is to serve as a backup for a partition in case the broker that hosts that partition fails. When this happens, a healthy broker will be selected and the partition will be moved to this broker, with the partition's data based on one of the replicas. As you can see here, the unit of replication in Kafka is not individual events but entire partitions.
This is important because knowing this will allow you to better understand how many actual partitions exist in your clusters. The number of partitions in a cluster is dictated by the number of topics you have, the partitions set for each topic, as well as the replication factor configured. For example, if a topic is created with 4 partitions and the replication factor set to 3, this means that 12 partitions will be created in the cluster. For a cluster with 4 brokers, it should look like what is shown in figure 3.
Figure 3: Replicas are evenly distributed between Kafka brokers.
The allocation is primarily focused on distributing the replicas evenly between the brokers, but this is not the only criteria. Starting from Kafka 0.10.0, if the brokers are configured to contain information about the rack they should belong to, then the allocation will also take this into consideration and assign partition replicas to different racks as much as possible. This is useful to ensure that if an entire rack fails, your data is safe and sound in another rack. For cloud-based deployments, building blocks such as availability zones should be used as racks.

Leaders and followers

Now that you know the relationship between partitions and replicas, know that there are two types of replicas: leaders and followers. Leader replicas are the ones responsible for processing write requests. When a producer sends events to Kafka, the leader replica is the one taking care of writing them off to guarantee consistency. From this perspective, leader replicas are notably your actual partitions. They are elected by a component of the cluster called controller, which is a broker from the cluster with the additional responsibility of handling leader's election. The other replicas, called followers, have the job of staying up-to-date with leaders in terms of event data. They do this by replicating all events from the leader. If a follower stays up-to-date with the leader, that follower is marked as an ISR, acronym for an in-sync replica. ISRs are super important because if a leader replica dies, one of the ISRs will be promoted to be the new leader for the partition. Followers that don't stay up-to-date with the leader for longer than what specified in the property replica.lag.time.max.ms will be marked as out-of-sync replicas. Anecdotally, they don't use OSR as an acronym. 🤷🏻
ISRs are also very important in the context of data consistency when producers write events to Kafka. This is done by using the ack property from the producer configuration. Setting this property to 0 will tell the producer to not wait for anything after events are sent to the brokers. The acknowledgment comes back right away, and no guarantees are provided. Setting this property to 1 provides the guarantee that at least the leader replica needs to store the event before returning the acknowledgement. Finally, setting this property to all provides the guarantee that all ISRs need to store the event before returning the acknowledgment. While it may be tempting to set the ack property to all at all times, please be aware that doing so also increases the producer latency. By producer latency, I mean the time it takes for the producer to write events to Kafka. But for the sake of end-to-end latency, this isn't much of a problem. To ensure data consistency, Kafka doesn't allow consumers to read events that haven't been written to all ISRs, anyway.
Speaking of consumers, they may also leverage replicas. When a consumer polls for events, a fetch request is sent to the broker that contains the leader replica, which as mentioned before handles all writes. However, since the introduction of the KIP-392 in Kafka, follower replicas can also serve read requests, which helps towards decreasing the network traffic costs of consumers not fetching events from nearest replicas. Consequentially, this also helps to alleviate the leader replica's work as it can be dedicated to handle more writes and fewer, luckily none, reads.
As you can see, replicas play an important role in how Kafka ensures data consistency. Therefore, it is important for you to know exactly how to keep track of the exact details of replicas in your cluster. The tool kafka-topics that are available in the /bin folder of your Kafka distribution can be used to investigate the details about the topic's partitions, replicas, and which replicas are ISRs.
1
kafka-topics --bootstrap-server <BROKER_ENDPOINT> --topic <TOPIC_NAME> --describe
This command will produce an output similar to this:
1
2
3
4
5
Topic: replicasTest TopicId: VAE9FZCOTH2OlLLM8Mn82A PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: replicasTest Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Offline:
Topic: replicasTest Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Offline:
Topic: replicasTest Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Offline:
Topic: replicasTest Partition: 3 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Offline:

Busy clusters with replication

Why aren't all replicas ISRs? You may ask. A replica may become out-of-sync with the leader because of reasons such as network congestion or to fall behind because the broker is busy. This is important because you should never forget that replicating entire partitions over the network is computationally expensive. When creating partitions, keep in mind that they will have replicas, and those replicas will be distributed over the cluster. If you create more partitions than your cluster may have been sized to handle, you can create contention problems on your brokers by being more busy replicating partitions than actually storing them and making them available for reads. This happens because the way Kafka handles replication is not very different from how consumers read data.
To stay in-sync, brokers with replicas send fetch requests to the brokers that contain the leader of the partition. These are the same fetch requests that consumers send when the poll() function from the consumer is invoked. Because it is the same fetch request for both — they are served by the same request processing model. Here is how it works. Fetch requests are received by one thread from a pool called network threads. These threads are associated with the listener port of each broker. Once a network thread finishes receiving the request, it adds the request to an in-memory FIFO queue that will be eventually processed by one thread from another pool called I/O threads. Figure 4 shows this in more details.
Figure 4: Kafka's request processing model.
If you create more replicas than your cluster can handle, you may exhaust both the pool of network threads and the pool of I/O threads with too many fetch requests related to replication. The default value set for the network thread pool is 3 and the default value set for the I/O thread pool is 8. In theory, a naïve solution to resolve the problem of busy threads is increasing these pools in higher numbers. However, doing this doesn't solve the problem. In fact, it may actually make the situation worse, as more threads mean more context switching between the CPUs. If you don't have enough CPUs in the broker node, then this is likely to happen. Having partitions as your unit-of-durability is a good thing for the sake of data consistency, but you must use this information wisely to avoid problems with busy clusters.

Summary

In this series, I discussed how partitions affect your system if you configure them wrongly. The role partitions play with Kafka was explored in scenarios of common traps, such as bottlenecks generated by running out of file handles, cluster storage not fully leveraged, decreased event processing speed, and busy clusters with replication. The discussion around these traps provides you with a good understanding of how Kafka handles your data using partitions. I also explained to you the impact that partitions have on many aspects of the system, such as consistency, parallelism, and durability.
As I mentioned before, partitions are the heart of everything Kafka does. They store your data, allow parallel reads to speed up your event processing, and they also ensure that your data is not lost when brokers fail. I mean, yeah, you can surely pretend that everything in Kafka boils down to topics, the story ends you wake up in your bed and believe whatever you want to believe. But I want to help you, so I recommend you to take the red pill and stay in Wonderland, so I can show you how deep the rabbit hole goes. So please, don't forget to practice everything you read in this series to build the right scars with Kafka partitions.
 

Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.

Comments