Data processing with Kafka Streams - An overview of stateless operations
Learn stateless operations in Kafka Streams with practical examples.
Abhishek Gupta
Amazon Employee
Published Sep 27, 2022
Last Modified Mar 14, 2024
Apache Kafka serves as a key component in data architectures. It has a rich ecosystem for building scalable data-intensive services including data pipelines, etc.
- Kafka (Producer and Consumer) client APIs allow you to choose from a variety of programming languages to produce and consume data from Kafka topics.
- You can integrate heterogeneous data systems using Kafka Connect which has an extensive suite of pre-built connectors and a framework that allows you to build custom integrations.
- You can use Kafka Streams (Java library) to develop streaming applications to process data flowing through Kafka topics.
Common requirements in data processing include filtering data, transforming it from one form to another, applying an action to each data record etc. These are often categorized as stateless operations. Kafka Streams is an ideal candidate if you want to apply stateless transformations on streaming data in Kafka. The
KStream
abstraction (part of Kafka Streams DSL API) offers functions such as filter
, map
, groupBy
etc.In this blog post, you will get an overview of these stateless operations along with practical examples and code snippets. I have grouped them into the following categories:
map
, filter
, group
, terminal
along with some miscellaneous features.To work with Kafka Streams, you need to start by creating an instance of KafkaStreams that serves as the entry point of your stream processing application. It needs a Topology along with a
java.util.Properties
object for additional configuration.A
Topology
is what defines your stream processing application - it's an acyclic graph of sources, processors, and sinks. Once you have defined the Topology
, create the KafkaStreams
instance and start processing.Let's dive into the specifics of the Kafka Streams APIs which implement these stateless operations.
map is a commonly used stateless operation which can be used to transform each record in the input
KStream
by applying a mapper function.It is available in multiple flavors -
map
, mapValues
, flatMap
, flatMapValues
For example, to convert key and value to uppercase, use the
map
method:If you only need to alter the value, use
mapValues
:flatMap
is similar to map, but it allows you to return multiple records (KeyValue
s).Say you have a stream of records
(foo <-> a,b,c)
, (bar <-> d,e)
etc., where foo
and bar
are keys and "a,b,c"
, "d,e"
are CSV string values. You want the resulting stream to be as follows: (foo,a)
, (foo,b)
, (foo,c)
, (bar,d)
, (bar,e)
. Here is an example of how this can be achieved:Each record in the stream gets
flatMap
ped such that each CSV (comma-separated) value is first split into its constituents, and a KeyValue pair is created for each part of the CSV string.There is also flatMapValues in case you only want to accept a value from the stream and return a collection of values.
For example, if values in a topic are words and you want to include the ones which are greater than a specified length. You can use filter since it allows you include records based on a criteria which can be defined using a Predicate. The result is a new
KStream
instance with the filtered records:filterNot
lets you exclude records based on a criteria. Here is an example (lambda style):Grouping is often a prerequisite to stateful aggregations in Kafka Streams. To group records by their key, you can use
groupByKey
as such:groupByKey
returns aKGroupedStream
object
groupBy
is a generic version of groupByKey
which gives you the ability to group based on a different key using a KeyValueMapper:groupByKey
and groupBy
allow you to specify a different Serde
(Serializer
and Deserializer
) instead of the default ones. Just use the overloaded version which accepts a Grouped object:Not all stateless computations return intermediate results such as a
KStream
, KTable
etc. They are often called terminal operations whose methods return void
. Let's look at a few examples.You may want to write the results of a stateless operation back to Kafka - most likely, in a different topic. You can use the to method to store the records of a
KStream
to a topic in Kafka.An overloaded version of
to
allows you to specify a Produced object to customize the Serdes
and partitioner:You are not limited to a fixed/static topic name. TopicNameExtractor allows you to include custom logic to choose a specific topic in a dynamic manner. Here is a simplified example - say you are using a
map
operation to convert each record to its upper case, and need to store it in a topic that has the original name with _uppercase
appended to it:In this example, we make use of
RecordContext
(contains record metadata) to get the name of the source topic.In all the above cases, the sink/target topic should pre-exist in Kafka
print is useful for debugging purposes - you can log each record in the
KStream
. It also accepts an instance of Printed to configure the behavior.This will print out the records e.g. if you pass in
(foo, bar)
and (john, doe)
to the input topic, they will get converted to uppercase and logged as such:You can also usePrinted.toFile
(instead oftoSysOut
) to target a specific file
foreach
is yet another terminal operation, but accepts a ForeachAction so you can specify what you want to do with each record in the KStream
.Here are some other useful operations offered by the Kafka Streams API:
Since
print
is a terminal operation, you no longer have access to the original KStream
. This where peek comes in handy because it returns the same KStream
instance. Just like foreach
, it accepts a ForeachAction
which can use to specify what you want to do for each record.The flexibility that peek means that not only can you log each key-value pair, but you can also materialize them to an output topic (unlike the
print
operation) using the same chain of method calls:While developing your processing pipelines with Kafka Streams DSL, you will find yourself pushing resulting stream records to an output topic using
to
and then creating a new stream from that (output) topic.Say you want to transform records, store the result in a topic and then continue to process the new (transformed) records, here is how you get this done:
Since
to
is terminal operation, a new KStream
(stream2
) had to be created. The through method can help simplify this. You can rewrite the above using a single chain of calls:Say you have streaming data coming into two different Kafka topics, each of which is represented by a
KStream
. You can merge the contents of these KStream
s into a single stream.Caveat: The resulting stream may not have all the records in order
With the help of a
KeyValueMapper
, selectKey
allows you to derive a new key with a different data type.branch seems quite interesting, but something I have not used a lot (to be honest!). You can use it to evaluate every record in a
KStream
against multiple criteria (represented by a Predicate
) and produce multiple KStream
s (an array) as output. The key differentiator is that you can use multiple Predicate
s instead of a single one as is the case with filter
and filterNot
.This blog post summarized most of the key stateless operations available in Kafka Streams along with code examples. You can combine them to build powerful streaming applications with the flexibility to adopt a functional programming style (using Java Lambda Expressions) or a more traditional, explicit way of defining your data processing logic.
Happy Building!
Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.