Data processing with Kafka Streams - An overview of stateless operations
Learn stateless operations in Kafka Streams with practical examples.
- Kafka (Producer and Consumer) client APIs allow you to choose from a variety of programming languages to produce and consume data from Kafka topics.
- You can integrate heterogeneous data systems using Kafka Connect which has an extensive suite of pre-built connectors and a framework that allows you to build custom integrations.
- You can use Kafka Streams (Java library) to develop streaming applications to process data flowing through Kafka topics.
KStream
abstraction (part of Kafka Streams DSL API) offers functions such as filter
, map
, groupBy
etc.map
, filter
, group
, terminal
along with some miscellaneous features.java.util.Properties
object for additional configuration.1
2
3
4
5
6
Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Topology
is what defines your stream processing application - it's an acyclic graph of sources, processors, and sinks. Once you have defined the Topology
, create the KafkaStreams
instance and start processing.1
2
3
4
5
6
7
8
//details omitted
Topology topology = ....;
KafkaStreams app = new KafkaStreams(topology, config);
app.start();
//block
new CountdownLatch(1).await();
KStream
by applying a mapper function.map
, mapValues
, flatMap
, flatMapValues
map
method:1
2
3
4
5
6
stream.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
public KeyValue<String, String> apply(String k, String v) {
return new KeyValue<>(k.toUpperCase(), v.toUpperCase());
}
});
mapValues
:1
stream.mapValues(value -> value.toUpperCase());
flatMap
is similar to map, but it allows you to return multiple records (KeyValue
s).(foo <-> a,b,c)
, (bar <-> d,e)
etc., where foo
and bar
are keys and "a,b,c"
, "d,e"
are CSV string values. You want the resulting stream to be as follows: (foo,a)
, (foo,b)
, (foo,c)
, (bar,d)
, (bar,e)
. Here is an example of how this can be achieved:1
2
3
4
5
6
7
8
9
10
stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csvRecord) {
String[] values = csvRecord.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(k, value))
.collect(Collectors.toList());
}
})
flatMap
ped such that each CSV (comma-separated) value is first split into its constituents, and a KeyValue pair is created for each part of the CSV string.KStream
instance with the filtered records:1
2
3
4
5
6
7
KStream<String, String> stream = builder.stream("words");
stream.filter(new Predicate<String, String>() {
public boolean test(String k, String v) {
return v.length() > 5;
}
})
filterNot
lets you exclude records based on a criteria. Here is an example (lambda style):1
2
KStream<String, String> stream = builder.stream("words");
stream.filterNot((key,value) -> value.startsWith("foo"));
groupByKey
as such:1
2
3
4
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KGroupedStream<String,String> kgs = stream.groupByKey();
groupByKey
returns aKGroupedStream
object
groupBy
is a generic version of groupByKey
which gives you the ability to group based on a different key using a KeyValueMapper:1
2
3
4
5
6
stream.groupBy(new KeyValueMapper<String, String, String>() {
public String apply(String k, String v) {
return k.toUpperCase();
}
});
groupByKey
and groupBy
allow you to specify a different Serde
(Serializer
and Deserializer
) instead of the default ones. Just use the overloaded version which accepts a Grouped object:1
stream.groupByKey(Grouped.with(Serdes.Bytes(), Serdes.Long()));
KStream
, KTable
etc. They are often called terminal operations whose methods return void
. Let's look at a few examples.KStream
to a topic in Kafka.1
2
3
4
KStream<String, String> stream = builder.stream("words");
stream.mapValues(value -> value.toUpperCase())
.to("uppercase-words");
to
allows you to specify a Produced object to customize the Serdes
and partitioner:1
2
stream.mapValues(value -> value.toUpperCase())
.to("output-topic",Produced.with(Serdes.Bytes(), Serdes.Long()));
map
operation to convert each record to its upper case, and need to store it in a topic that has the original name with _uppercase
appended to it:1
2
3
4
5
6
7
stream.mapValues(value -> value.toUpperCase())
.to(new TopicNameExtractor<String, String>() {
public String extract(String k, String v, RecordContext rc) {
return rc.topic()+"_uppercase";
}
});
RecordContext
(contains record metadata) to get the name of the source topic.In all the above cases, the sink/target topic should pre-exist in Kafka
KStream
. It also accepts an instance of Printed to configure the behavior.1
2
3
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase()).print(Printed.toSysOut());
(foo, bar)
and (john, doe)
to the input topic, they will get converted to uppercase and logged as such:1
2
[KSTREAM-MAPVALUES-0000000001]: foo, BAR
[KSTREAM-MAPVALUES-0000000001]: john, DOE
You can also usePrinted.toFile
(instead oftoSysOut
) to target a specific file
foreach
is yet another terminal operation, but accepts a ForeachAction so you can specify what you want to do with each record in the KStream
.print
is a terminal operation, you no longer have access to the original KStream
. This where peek comes in handy because it returns the same KStream
instance. Just like foreach
, it accepts a ForeachAction
which can use to specify what you want to do for each record.print
operation) using the same chain of method calls:1
2
3
4
5
6
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase())
.peek((k,v) -> System.out.println("key="+k+", value="+v))
.to(OUTPUT_TOPIC);
to
and then creating a new stream from that (output) topic.1
2
3
4
5
6
7
8
9
10
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream(INPUT_TOPIC);
stream1.mapValues(v -> v.toUpperCase()).to(OUTPUT_TOPIC);
//output topic now becomes the input source
KStream<String, String> stream2 = builder.stream(OUTPUT_TOPIC);
//continue processing with stream2
stream2.filter((k,v) -> v.length > 5).to(LENGTHY_WORDS_TOPIC);
to
is terminal operation, a new KStream
(stream2
) had to be created. The through method can help simplify this. You can rewrite the above using a single chain of calls:1
2
3
4
5
6
7
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase())
.through(OUTPUT_TOPIC)
.filter((k,v) -> v.length > 5)
.to(LENGTHY_WORDS_TOPIC);
KStream
. You can merge the contents of these KStream
s into a single stream.1
2
3
4
5
6
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
stream1.merge(stream2).to("output-topic");
Caveat: The resulting stream may not have all the records in order
KeyValueMapper
, selectKey
allows you to derive a new key with a different data type.1
2
3
4
5
6
7
8
9
StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, String> stream = builder.stream(INPUT_TOPIC);
stream.selectKey(new KeyValueMapper<Integer, String, String>() {
public String apply(Integer k, String v) {
return Integer.toString(k);
}
})
KStream
against multiple criteria (represented by a Predicate
) and produce multiple KStream
s (an array) as output. The key differentiator is that you can use multiple Predicate
s instead of a single one as is the case with filter
and filterNot
.Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.