Ingesting Data into OpenSearch using Apache Kafka and Go.
There are times you might need to write a custom integration layer to fulfill specific requirements in your data pipeline. Learn how to do this with Kafka and OpenSearch using Go.
Introduction to Amazon OpenSearch Serverless and Amazon MSK Serverless
Application overview and key architectural considerations
How do Go client applications authenticate with MSK Serverless using IAM ?
OpenSearch Serverless collection
You are now ready to run the application!
Query movies data in OpenSearch
Fetch data only for specific fields
About | |
---|---|
✅ AWS experience | 200 - Intermediate |
⏱ Time to complete | 60 minutes |
💰 Cost to complete | Free tier eligible |
🧩 Prerequisites | - AWS account -CDK installed: Visit Get Started with AWS CDK to learn more. |
💻 Code Sample | Code sample used in tutorial on GitHub |
📢 Feedback | Any feedback, issues, or just a 👍 / 👎 ? |
⏰ Last Updated | 2023-07-10 |
- Overview of how to set up the required AWS services - OpenSearch Serverless, MSK Serverless, AWS Cloud9 along with IAM policies and security configurations.
- High-level walk through of the application.
- Get the data ingestion pipeline up and running.
- How to query data in OpenSearch.
Amazon OpenSearch Service supports OpenSearch and legacy Elasticsearch OSS (up to 7.10, the final open source version of the software). When you create a cluster, you have the option of which search engine to use.
You can refer to the details in Comparing OpenSearch Service and OpenSearch Serverless.
EC2
instance:- As the name suggests, the producer sends data to the MSK Serverless cluster.
- The consumer application receives data (
movie
information) from the MSK Serverless topic and uses the OpenSearch Go client to index data in themovies
collection.
- The producer and consumer applications run on the same compute platform (an EC2 instance).
- There is a single consumer application instance processing data from the MSK topic. However, you can try to run multiple instances of the consumer application and see how the data is distributed across the instances.
- Instead of using the Kafka CLI to produce data, a custom producer application was written in Go along with a REST endpoint to send data. This demonstrates how to write a Kafka producer application in Go and mimics the Kafka CLI.
- The volume of data used is small.
- OpenSearch Serverless collection has a Public access type.
- Choose an appropriate compute platform for your consumer application based on data volume and scalability requirements - more on this below.
- Choose VPC access type for your OpenSearch Serverless collection
- Consider using Amazon OpenSearch Ingestion to create your data pipelines.
- Containers - You can package your consumer application as a Docker container (
Dockerfile
is available in the GitHub repository) and deploy it to Amazon EKS or Amazon ECS. - If you deploy the application to Amazon EKS, you can also consider using KEDA to auto-scale your consumer application based on the number of messages in the MSK topic.
- Serverless - It's also possible to use MSK as an event source for AWS Lambda functions. You can write your consumer application as a Lambda function and configure it to be triggered by MSK events or alternatively run it on AWS Fargate.
- Since the producer application is a REST API, you can deploy it to AWS App Runner.
- Finally, you can leverage Amazon EC2 Auto Scaling groups to auto-scale the EC2 fleet for you consumer application.
Let's take a short detour into understanding how this works with Go.
franz-go
Kafka client library supports IAM authentication. Here are snippets from the consumer application that show how it works in practice:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func init() {
//......
cfg, err = config.LoadDefaultConfig(context.Background(), config.WithRegion("us-east-1"), config.WithCredentialsProvider(ec2rolecreds.New()))
creds, err = cfg.Credentials.Retrieve(context.Background())
//....
func initializeKafkaClient() {
opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(mskBroker, ",")...),
kgo.SASL(sasl_aws.ManagedStreamingIAM(func(ctx context.Context) (sasl_aws.Auth, error) {
return sasl_aws.Auth{
AccessKey: creds.AccessKeyID,
SecretKey: creds.SecretAccessKey,
SessionToken: creds.SessionToken,
UserAgent: "msk-ec2-consumer-app",
}, nil
})),
//.....
- First, the application uses the
ec2rolecreds.New()
credentials provider to retrieve the temporary IAM credentials from the EC2 instance metadata service. The EC2 Instance role should have appropriate IAM role (with permissions) to execute required operations on MSK cluster components (more on this in the subsequent sections). - These credentials are then used to initialize the Kafka client with the
AWS_MSK_IAM
SASL authentication implementation in the sasl_aws package.
Note: Since there are multiple Go clients for Kafka (including Sarama), please make sure to consult their client documentation to confirm whether they support IAM authentication.
- Required IAM roles
- MSK Serverless Cluster
- OpenSearch Serverless collection
- AWS Cloud9 EC2 environment to run your application
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*"
],
"Resource": [
"<ARN of the MSK Serverless cluster>",
"arn:aws:kafka:us-east-1:<AWS_ACCOUNT_ID>:topic/<MSK_CLUSTER_NAME>/*",
"arn:aws:kafka:us-east-1:AWS_ACCOUNT_ID:group/<MSK_CLUSTER_NAME>/*"
]
},
{
"Effect": "Allow",
"Action": [
"aoss:APIAccessAll"
],
"Resource": "*"
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "ec2.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Note the for the purposes of this tutorial, we chose Public access type. It's recommended to select VPC for production workloads.
1
2
3
git clone https://github.com/build-on-aws/opensearch-using-kafka-golang
cd opensearch-using-kafka-golang
1
2
3
4
5
6
cd msk-producer
export MSK_BROKER=<enter MSK Serverless cluster endpoint>
export MSK_TOPIC=movies
go run main.go
1
2
3
4
MSK_BROKER <MSK Serverless cluster endpoint>
MSK_TOPIC movies
starting producer app
http server ready
HTTP
endpoint exposed by the application you just started and submit movie data (from movies.txt
file) in JSON
format using curl
:1
./send-data.sh
1
2
3
4
5
6
7
8
9
producing data to topic
payload {"directors": ["Joseph Gordon-Levitt"], "release_date": "2013-01-18T00:00:00Z", "rating": 7.4, "genres": ["Comedy", "Drama"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQxNTc3NDM2MF5BMl5BanBnXkFtZTcwNzQ5NTQ3OQ@@._V1_SX400_.jpg", "plot": "A New Jersey guy dedicated to his family, friends, and church, develops unrealistic expectations from watching porn and works to find happiness and intimacy with his potential true love.", "title": "Don Jon", "rank": 1, "running_time_secs": 5400, "actors": ["Joseph Gordon-Levitt", "Scarlett Johansson", "Julianne Moore"], "year": 2013}
record produced successfully to offset 2 in partition 0 of topic movies
producing data to topic
payload {"directors": ["Ron Howard"], "release_date": "2013-09-02T00:00:00Z", "rating": 8.3, "genres": ["Action", "Biography", "Drama", "Sport"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg", "plot": "A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.", "title": "Rush", "rank": 2, "running_time_secs": 7380, "actors": ["Daniel Br\u00c3\u00bchl", "Chris Hemsworth", "Olivia Wilde"], "year": 2013}
record produced successfully to offset 4 in partition 1 of topic movies
.....
For the purposes of this tutorial and to keep it simple and easy to follow, the amount of data has been purposely restricted to 1500 records and the script intentionally sleeps for 1 second after sending each record to the producer. You should be able to follow along comfortably.
movies
topic, you can start the consumer application start processing data from the MSK Serverless cluster and index it in the OpenSearch Serverless collection.1
2
3
4
5
6
7
8
cd msk-consumer
export MSK_BROKER=<enter MSK Serverless cluster endpoint>
export MSK_TOPIC=movies
export OPENSEARCH_INDEX_NAME=movies-index
export OPENSEARCH_ENDPOINT_URL=<enter OpenSearch Serverless endpoint>
go run main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
using default value for AWS_REGION - us-east-1
MSK_BROKER <MSK Serverless cluster endpoint>
MSK_TOPIC movies
OPENSEARCH_INDEX_NAME movies-index
OPENSEARCH_ENDPOINT_URL <OpenSearch Serverless endpoint>
using credentials from: EC2RoleProvider
kafka consumer goroutine started. waiting for records
paritions ASSIGNED for topic movies [0 1 2]
got record from partition 1 key= val={"directors": ["Joseph Gordon-Levitt"], "release_date": "2013-01-18T00:00:00Z", "rating": 7.4, "genres": ["Comedy", "Drama"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQxNTc3NDM2MF5BMl5BanBnXkFtZTcwNzQ5NTQ3OQ@@._V1_SX400_.jpg", "plot": "A New Jersey guy dedicated to his family, friends, and church, develops unrealistic expectations from watching porn and works to find happiness and intimacy with his potential true love.", "title": "Don Jon", "rank": 1, "running_time_secs": 5400, "actors": ["Joseph Gordon-Levitt", "Scarlett Johansson", "Julianne Moore"], "year": 2013}
movie data indexed
committing offsets
got record from partition 2 key= val={"directors": ["Ron Howard"], "release_date": "2013-09-02T00:00:00Z", "rating": 8.3, "genres": ["Action", "Biography", "Drama", "Sport"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg", "plot": "A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.", "title": "Rush", "rank": 2, "running_time_secs": 7380, "actors": ["Daniel Br\u00c3\u00bchl", "Chris Hemsworth", "Olivia Wilde"], "year": 2013}
movie data indexed
committing offsets
.....
1500
movies indexed in the OpenSearch Serverless collection. You don't have to wait for it to finish though. Once there are a few hundred records, you can go ahead and navigate to Dev Tools in the OpenSearch dashboard to execute the below queries.1
GET movies-index/_search
_source
option to retrieve the source from selected fields. For example, to retrieve only the title
, plot
and genres
fields, run the following query:1
2
3
4
5
6
7
8
9
10
GET movies-index/_search
{
"_source": {
"includes": [
"title",
"plot",
"genres"
]
}
}
christmas
in the title
field, run the following query:1
2
3
4
5
6
7
8
9
10
GET movies-index/_search
{
"query": {
"term": {
"title": {
"value": "christmas"
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET movies-index/_search
{
"_source": {
"includes": [
"title",
"actors"
]
},
"query": {
"query_string": {
"default_field": "title",
"query": "harry"
}
}
}
ratings
, genre
, and year
to search results based on the values of those fields. With aggregations, we can answer questions like ‘How many movies are in each genre?“1
2
3
4
5
6
7
8
9
GET movies-index/_search
{
"size":0,
"aggs": {
"genres": {
"terms":{"field": "genres.keyword"}
}
}
}
- Also delete IAM roles and policies
Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.