Data processing with Kafka Streams - An overview of stateless operations
Learn stateless operations in Kafka Streams with practical examples.
Apache Kafka serves as a key component in data architectures. It has a rich ecosystem for building scalable data-intensive services including data pipelines, etc.
- Kafka (Producer and Consumer) client APIs allow you to choose from a variety of programming languages to produce and consume data from Kafka topics.
- You can integrate heterogeneous data systems using Kafka Connect which has an extensive suite of pre-built connectors and a framework that allows you to build custom integrations.
- You can use Kafka Streams (Java library) to develop streaming applications to process data flowing through Kafka topics.
Common requirements in data processing include filtering data, transforming it from one form to another, applying an action to each data record etc. These are often categorized as stateless operations. Kafka Streams is an ideal candidate if you want to apply stateless transformations on streaming data in Kafka. The KStream
abstraction (part of Kafka Streams DSL API) offers functions such as filter
, map
, groupBy
etc.
In this blog post, you will get an overview of these stateless operations along with practical examples and code snippets. I have grouped them into the following categories: map
, filter
, group
, terminal
along with some miscellaneous features.
To work with Kafka Streams, you need to start by creating an instance of KafkaStreams that serves as the entry point of your stream processing application. It needs a Topology along with a java.util.Properties
object for additional configuration.
1
2
3
4
5
6
Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
A Topology
is what defines your stream processing application - it's an acyclic graph of sources, processors, and sinks. Once you have defined the Topology
, create the KafkaStreams
instance and start processing.
1
2
3
4
5
6
7
8
//details omitted
Topology topology = ....;
KafkaStreams app = new KafkaStreams(topology, config);
app.start();
//block
new CountdownLatch(1).await();
Let's dive into the specifics of the Kafka Streams APIs which implement these stateless operations.
map is a commonly used stateless operation which can be used to transform each record in the input KStream
by applying a mapper function.
It is available in multiple flavors - map
, mapValues
, flatMap
, flatMapValues
For example, to convert key and value to uppercase, use the map
method:
1
2
3
4
5
6
stream.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
public KeyValue<String, String> apply(String k, String v) {
return new KeyValue<>(k.toUpperCase(), v.toUpperCase());
}
});
If you only need to alter the value, use mapValues
:
1
stream.mapValues(value -> value.toUpperCase());
flatMap
is similar to map, but it allows you to return multiple records (KeyValue
s).
Say you have a stream of records (foo <-> a,b,c)
, (bar <-> d,e)
etc., where foo
and bar
are keys and "a,b,c"
, "d,e"
are CSV string values. You want the resulting stream to be as follows: (foo,a)
, (foo,b)
, (foo,c)
, (bar,d)
, (bar,e)
. Here is an example of how this can be achieved:
1
2
3
4
5
6
7
8
9
10
stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csvRecord) {
String[] values = csvRecord.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(k, value))
.collect(Collectors.toList());
}
})
Each record in the stream gets flatMap
ped such that each CSV (comma-separated) value is first split into its constituents, and a KeyValue pair is created for each part of the CSV string.
There is also flatMapValues in case you only want to accept a value from the stream and return a collection of values.
For example, if values in a topic are words and you want to include the ones which are greater than a specified length. You can use filter since it allows you include records based on a criteria which can be defined using a Predicate. The result is a new KStream
instance with the filtered records:
1
2
3
4
5
6
7
KStream<String, String> stream = builder.stream("words");
stream.filter(new Predicate<String, String>() {
public boolean test(String k, String v) {
return v.length() > 5;
}
})
filterNot
lets you exclude records based on a criteria. Here is an example (lambda style):
1
2
KStream<String, String> stream = builder.stream("words");
stream.filterNot((key,value) -> value.startsWith("foo"));
Grouping is often a prerequisite to stateful aggregations in Kafka Streams. To group records by their key, you can use groupByKey
as such:
1
2
3
4
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KGroupedStream<String,String> kgs = stream.groupByKey();
groupByKey
returns aKGroupedStream
object
groupBy
is a generic version of groupByKey
which gives you the ability to group based on a different key using a KeyValueMapper:
1
2
3
4
5
6
stream.groupBy(new KeyValueMapper<String, String, String>() {
public String apply(String k, String v) {
return k.toUpperCase();
}
});
groupByKey
and groupBy
allow you to specify a different Serde
(Serializer
and Deserializer
) instead of the default ones. Just use the overloaded version which accepts a Grouped object:
1
stream.groupByKey(Grouped.with(Serdes.Bytes(), Serdes.Long()));
Not all stateless computations return intermediate results such as a KStream
, KTable
etc. They are often called terminal operations whose methods return void
. Let's look at a few examples.
You may want to write the results of a stateless operation back to Kafka - most likely, in a different topic. You can use the to method to store the records of a KStream
to a topic in Kafka.
1
2
3
4
KStream<String, String> stream = builder.stream("words");
stream.mapValues(value -> value.toUpperCase())
.to("uppercase-words");
An overloaded version of to
allows you to specify a Produced object to customize the Serdes
and partitioner:
1
2
stream.mapValues(value -> value.toUpperCase())
.to("output-topic",Produced.with(Serdes.Bytes(), Serdes.Long()));
You are not limited to a fixed/static topic name. TopicNameExtractor allows you to include custom logic to choose a specific topic in a dynamic manner. Here is a simplified example - say you are using a map
operation to convert each record to its upper case, and need to store it in a topic that has the original name with _uppercase
appended to it:
1
2
3
4
5
6
7
stream.mapValues(value -> value.toUpperCase())
.to(new TopicNameExtractor<String, String>() {
public String extract(String k, String v, RecordContext rc) {
return rc.topic()+"_uppercase";
}
});
In this example, we make use of RecordContext
(contains record metadata) to get the name of the source topic.
In all the above cases, the sink/target topic should pre-exist in Kafka
print is useful for debugging purposes - you can log each record in the KStream
. It also accepts an instance of Printed to configure the behavior.
1
2
3
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase()).print(Printed.toSysOut());
This will print out the records e.g. if you pass in (foo, bar)
and (john, doe)
to the input topic, they will get converted to uppercase and logged as such:
1
2
[KSTREAM-MAPVALUES-0000000001]: foo, BAR
[KSTREAM-MAPVALUES-0000000001]: john, DOE
You can also use
Printed.toFile
(instead oftoSysOut
) to target a specific file
foreach
is yet another terminal operation, but accepts a ForeachAction so you can specify what you want to do with each record in the KStream
.
Here are some other useful operations offered by the Kafka Streams API:
Since print
is a terminal operation, you no longer have access to the original KStream
. This where peek comes in handy because it returns the same KStream
instance. Just like foreach
, it accepts a ForeachAction
which can use to specify what you want to do for each record.
The flexibility that peek means that not only can you log each key-value pair, but you can also materialize them to an output topic (unlike the print
operation) using the same chain of method calls:
1
2
3
4
5
6
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase())
.peek((k,v) -> System.out.println("key="+k+", value="+v))
.to(OUTPUT_TOPIC);
While developing your processing pipelines with Kafka Streams DSL, you will find yourself pushing resulting stream records to an output topic using to
and then creating a new stream from that (output) topic.
Say you want to transform records, store the result in a topic and then continue to process the new (transformed) records, here is how you get this done:
1
2
3
4
5
6
7
8
9
10
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream(INPUT_TOPIC);
stream1.mapValues(v -> v.toUpperCase()).to(OUTPUT_TOPIC);
//output topic now becomes the input source
KStream<String, String> stream2 = builder.stream(OUTPUT_TOPIC);
//continue processing with stream2
stream2.filter((k,v) -> v.length > 5).to(LENGTHY_WORDS_TOPIC);
Since to
is terminal operation, a new KStream
(stream2
) had to be created. The through method can help simplify this. You can rewrite the above using a single chain of calls:
1
2
3
4
5
6
7
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase())
.through(OUTPUT_TOPIC)
.filter((k,v) -> v.length > 5)
.to(LENGTHY_WORDS_TOPIC);
Say you have streaming data coming into two different Kafka topics, each of which is represented by a KStream
. You can merge the contents of these KStream
s into a single stream.
1
2
3
4
5
6
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
stream1.merge(stream2).to("output-topic");
Caveat: The resulting stream may not have all the records in order
With the help of a KeyValueMapper
, selectKey
allows you to derive a new key with a different data type.
1
2
3
4
5
6
7
8
9
StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, String> stream = builder.stream(INPUT_TOPIC);
stream.selectKey(new KeyValueMapper<Integer, String, String>() {
public String apply(Integer k, String v) {
return Integer.toString(k);
}
})
branch seems quite interesting, but something I have not used a lot (to be honest!). You can use it to evaluate every record in a KStream
against multiple criteria (represented by a Predicate
) and produce multiple KStream
s (an array) as output. The key differentiator is that you can use multiple Predicate
s instead of a single one as is the case with filter
and filterNot
.
This blog post summarized most of the key stateless operations available in Kafka Streams along with code examples. You can combine them to build powerful streaming applications with the flexibility to adopt a functional programming style (using Java Lambda Expressions) or a more traditional, explicit way of defining your data processing logic.
Happy Building!