I Solved a Nasty Data Deserialization Problem involving Kafka, Go, and Protocol Buffers using Amazon Q. Here is How.
Every once in a while, software engineers get stuck with a problem that torment them for days. In this blog post, I share how Amazon Q helped me solve a problem related to data deserialization using Protocol Buffers.

.proto
file the other team used to generate the code for their project. This is the mental model Protobuf uses, different teams relying on the same domain model defined.1
2
3
4
5
6
7
syntax = "proto3";
message Person {
string userName = 1;
optional int64 favoriteNumber = 2;
repeated string interests = 3;
}
.proto
file, the person.pb.go
code that describes the Person{}
struct from the domain model. From this point on, the job was essentially to refactor the code to read the messages from Kafka, decode the message using Protobuf, and continue the logic of the code as usual. Protobuf is native to Go, so I assumed it wouldn't be that hard. Indeed, I could implement the code fairly quickly and get ready for the tests. Here is the initial version of the code I wrote.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func main() {
sigchan := make(chan os.Signal, 1)
topics := []string{topicName}
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
"broker.address.family": "v4",
"group.id": uuid.New(),
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest",
"enable.auto.offset.store": false,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
c.SubscribeTopics(topics, nil)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
person := Person{}
err = proto.Unmarshal(e.Value, &person)
if err != nil {
fmt.Printf("🧑🏻💻 userName: %s, favoriteNumber: %d, interests: %s\n",
person.UserName, *person.FavoriteNumber, person.Interests)
}
_, err = c.StoreMessage(e)
if err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
e.TopicPartition)
}
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
34
to 36
. There, I receive the message from Kafka in the format of byte[]
and I start the process of deserializing it back to the data structure the message represents, the Person{}
struct in this case. For this, I used the google.golang.org/protobuf/proto
package to decode the array of bytes into a pointer of the struct.

"cannot parse invalid wire-format data"


Suggestion | It can't be because |
---|---|
The protobuf data is corrupted or invalid in some way. This could happen if it was modified after encoding. | I was the only who wrote the data into the Kafka topic. Moreover, I could check that the same message stored was the one that I received in my debugging session. |
The Go struct you are trying to unmarshal into does not match the structure of the protobuf data. All the fields need to be in the same order and of the compatible types. | I was working with a fresh new copy of the code generated by the Protobuf compiler. No modifications were made in the created code. |
Double check that the Person struct matches the structure defined in the .proto file used to encode it. | Yes, I double-checked and it matches. |
- Check if the length prefix is corrupted or incorrect.
- Check if the data is valid Protobuf data to begin with.
1
0000101078210599971141006f02000e074d61727665
.proto
file of the project.

08
represents the length of the string used for the userName
field. In the example provided, John Doe
was used as user name. This name has exactly 8 characters. The message that I sent to Kafka had Ricardo
as the user name, so the first byte of the array should be 07
and not, 00
. Something indeed was wrong with the message that I was receiving in my Go code. Hands down, the investigation hypothesis that I got from Amazon Q was spot on.

0000101078210599971141006f02000e074d61727665
starts with a bunch of zeros instead of 07
.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class.getName());
properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
try (KafkaProducer<Integer, Person> producer = new KafkaProducer<>(properties)) {
Person person = Person.newBuilder()
.setUserName("Ricardo")
.setFavoriteNumber(14)
.addInterests("Marvel")
.build();
ProducerRecord<Integer, Person> record = new ProducerRecord<>(TOPIC_NAME, 1, person);
producer.send(
record, (recordMetadata, e) -> {
System.out.printf("➡️ Message sent successfully to topic [%s] ✅\n", recordMetadata.topic());
});
}


7
. It shares that this class called KafkaProtobufSerializer
is the one who effectively carry on the serialization. Understanding this was the key to understand how the message is ending up with the bytes in that order. I searched for the Java code that implements that class on GitHub and asked to Amazon Q:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func main() {
sigchan := make(chan os.Signal, 1)
topics := []string{topicName}
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
"broker.address.family": "v4",
"group.id": uuid.New(),
"session.timeout.ms": 6000,
"auto.offset.reset": "earliest",
"enable.auto.offset.store": false,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
client, err := schemaregistry.NewClient(schemaregistry.NewConfig(schemaRegistryURL))
if err != nil {
fmt.Printf("Failed to create schema registry client: %s\n", err)
os.Exit(1)
}
deser, err := protobuf.NewDeserializer(client, serde.ValueSerde, protobuf.NewDeserializerConfig())
if err != nil {
fmt.Printf("Failed to create deserializer: %s\n", err)
os.Exit(1)
}
deser.ProtoRegistry.RegisterMessage((&Person{}).ProtoReflect().Type())
c.SubscribeTopics(topics, nil)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
value, err := deser.Deserialize(*e.TopicPartition.Topic, e.Value)
if err != nil {
fmt.Fprintf(os.Stderr, "Error deserializing message: %s\n", err)
os.Exit(1)
}
person := value.(*Person)
fmt.Printf("🧑🏻💻 userName: %s, favoriteNumber: %d, interests: %s\n",
person.UserName, *person.FavoriteNumber, person.Interests)
_, err = c.StoreMessage(e)
if err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
e.TopicPartition)
}
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
