My Event is Urgent than Yours: Prioritizing Event Processing with Apache Kafka
Apache Kafka doesn't support event processing prioritization. But you don't have to settle for this. Let's see how to solve this problem using the bucket priority pattern.
subscribe()
and consumer.poll()
, which requires no awareness whatsoever about partitions. Giving up this simplicity supersize the chances of creating code that is both hard to read and maintain.Platinum
and another called Gold
. The Platinum bucket is obviously bigger than Gold and thus can fit more events. Also, there are four consumers working on the Platinum bucket, whereas there are only two consumers working on the Gold bucket. Conceptually, events in the Platinum bucket tend to be processed first and faster than any message ending up in the Gold bucket. If you need strict prioritization, meaning that events from the Platinum
bucket must be executed before the ones from the Gold
bucket, you can configure your consumers to execute in that respective order.70%
of allocation and the bucket with a lower priority has 30%
. In the previous example of the topic with six partitions, initially the bucket with a higher priority would have four partitions and the bucket with a lower priority would have two partitions. But if someone increases the number of partitions from six to twelve, for example, the bucket with a higher priority now would have eight partitions and the bucket with a lower priority would have four partitions.build-on-aws-blog
.1
git clone https://github.com/build-on-aws/prioritizing-event-processing-with-apache-kafka.git -b build-on-aws-blog
1
cd prioritizing-event-processing-with-apache-kafka
1
docker compose up -d
1
docker ps -a
kafka
. Check if its status is set to healthy
. If the broker is healthy, you can proceed.1
mvn clean package
target
will be created. Inside this folder there will be a file named bucket-priority-pattern-1.0.0-jar-with-dependencies.jar
, which contains the code of the bucket priority pattern, the example code, and all the dependencies required to execute this code properly.orders-per-bucket
.1
java -cp target/bucket-priority-pattern-1.0.0-jar-with-dependencies.jar blog.buildon.aws.streaming.kafka.BucketBasedProducer
orders-per-bucket
topic. You should see an output like this:1
2
3
4
5
6
7
8
9
10
11
12
13
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
Key 'Gold-1' was sent to partition 4
Key 'Platinum-2' was sent to partition 0
Key 'Gold-3' was sent to partition 5
Key 'Platinum-4' was sent to partition 1
Key 'Gold-5' was sent to partition 4
Key 'Platinum-6' was sent to partition 2
Key 'Gold-7' was sent to partition 5
Key 'Platinum-8' was sent to partition 3
Key 'Gold-9' was sent to partition 4
Key 'Platinum-10' was sent to partition 0
Platinum
bucket. The code for this application was written to simulate multiple concurrent consumers using threads. For this reason, you will specify how many threads you want to dedicate to this bucket.1
java -cp target/bucket-priority-pattern-1.0.0-jar-with-dependencies.jar blog.buildon.aws.streaming.kafka.BucketBasedConsumer Platinum 5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
[Consumer-Thread-4] Key = Platinum-2, Partition = 0
[Consumer-Thread-3] Key = Platinum-4, Partition = 1
[Consumer-Thread-1] Key = Platinum-8, Partition = 2
[Consumer-Thread-4] Key = Platinum-10, Partition = 3
[Consumer-Thread-3] Key = Platinum-12, Partition = 0
[Consumer-Thread-2] Key = Platinum-14, Partition = 1
[Consumer-Thread-1] Key = Platinum-16, Partition = 2
[Consumer-Thread-4] Key = Platinum-18, Partition = 3
[Consumer-Thread-3] Key = Platinum-20, Partition = 0
[Consumer-Thread-2] Key = Platinum-22, Partition = 1
[Consumer-Thread-1] Key = Platinum-24, Partition = 2
Gold
bucket.1
java -cp target/bucket-priority-pattern-1.0.0-jar-with-dependencies.jar blog.buildon.aws.streaming.kafka.BucketBasedConsumer Gold 5
1
2
3
4
5
6
7
8
9
10
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
[Consumer-Thread-0] Key = Gold-13, Partition = 4
[Consumer-Thread-4] Key = Gold-15, Partition = 5
[Consumer-Thread-0] Key = Gold-17, Partition = 4
[Consumer-Thread-4] Key = Gold-19, Partition = 5
[Consumer-Thread-0] Key = Gold-21, Partition = 4
[Consumer-Thread-4] Key = Gold-23, Partition = 5
[Consumer-Thread-0] Key = Gold-25, Partition = 4
Platinum
bucket, the number of threads actually processing events differs from what you specified. In the command-line, you asked for five threads dedicated to the Gold
bucket, but only two of them are processing events. Let's understand now why.orders-per-bucket
was created with six partitions. The producer application is implemented using the Java class blog.buildon.aws.streaming.kafka.BucketBasedProducer
. If you look at its code, you see that the first thing the code does is create the topic, using Kafka's admin API.1
2
3
4
public static void main(String[] args) {
createTopic(ORDERS_PER_BUCKET, 6, (short)1);
new BucketBasedProducer().run(getConfigs());
}
KafkaProducer<K,V>
with a configuration that enables the bucket priority pattern.1
2
3
4
5
6
7
8
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
"code.buildon.aws.streaming.kafka.BucketPriorityPartitioner");
configs.put(BucketPriorityConfig.TOPIC_CONFIG, ORDERS_PER_BUCKET);
configs.put(BucketPriorityConfig.BUCKETS_CONFIG, "Platinum, Gold");
configs.put(BucketPriorityConfig.ALLOCATION_CONFIG, "70%, 30%");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {}
Platinum
bucket, it was configured to be created with 70%
of the events allocation, and the Gold
bucket was configured to be created with the remaining 30%
. With this configuration in place, the producer application continuously creates one event for each bucket—determining which bucket to use via the event key.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
AtomicInteger counter = new AtomicInteger(0);
String[] buckets = {"Platinum", "Gold"};
for (;;) {
int value = counter.incrementAndGet();
int index = Utils.toPositive(value) % buckets.length;
String recordKey = buckets[index] + "-" + value;
ProducerRecord<String, String> record =
new ProducerRecord<>(ORDERS_PER_BUCKET, recordKey, "Value");
producer.send(record, (metadata, exception) -> {
System.out.println(String.format(
"Key '%s' was sent to partition %d",
recordKey, metadata.partition()));
});
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
orders-per-bucket
was created with six partitions, the partitions [0, 1, 2, 3]
were assigned to the Platinum
bucket. This represents 70%
of the events allocation. The partitions [4, 5]
were assigned to the Gold
bucket. This represents 30%
of the events allocation.blog.buildon.aws.streaming.kafka.BucketBasedConsumer
. If you look at its code, you see that during the bootstrap; the code instantiates one thread for each consumer of a bucket.1
2
3
4
5
6
7
private void run(String bucketName, int numberOfThreads, Properties configs) {
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
String threadName = String.format("Consumer-Thread-%d", i);
executorService.submit(new ConsumerThread(bucketName, threadName, configs));
}
}
KafkaConsumer<K,V>
is created, also configured with the bucket priority pattern. The configuration is the same as the producer application, since both of them must agree on the percentages of each bucket. The remaining of the code is just plain-old Kafka consumer implementation. It continuously polls for new events, and process them as they arrive.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private class ConsumerThread implements Runnable {
private String threadName;
private KafkaConsumer<String, String> consumer;
public ConsumerThread(String bucketName,
String threadName, Properties configs) {
this.threadName = threadName;
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
configs.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
configs.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ORDERS_PER_BUCKET + "-group");
// Configuring the bucket priority pattern
configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"code.buildon.aws.streaming.kafka.BucketPriorityAssignor");
configs.put(BucketPriorityConfig.TOPIC_CONFIG, ORDERS_PER_BUCKET);
configs.put(BucketPriorityConfig.BUCKETS_CONFIG, "Platinum, Gold");
configs.put(BucketPriorityConfig.ALLOCATION_CONFIG, "70%, 30%");
configs.put(BucketPriorityConfig.BUCKET_CONFIG, bucketName);
consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(ORDERS_PER_BUCKET));
}
public void run() {
for (;;) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(Integer.MAX_VALUE));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("[%s] Key = %s, Partition = %d",
threadName, record.key(), record.partition()));
}
}
}
}