Ingesting Data into OpenSearch using Apache Kafka and Go.
There are times you might need to write a custom integration layer to fulfill specific requirements in your data pipeline. Learn how to do this with Kafka and OpenSearch using Go.
Introduction to Amazon OpenSearch Serverless and Amazon MSK Serverless
Application overview and key architectural considerations
How do Go client applications authenticate with MSK Serverless using IAM ?
OpenSearch Serverless collection
You are now ready to run the application!
Query movies data in OpenSearch
Fetch data only for specific fields
About | |
---|---|
✅ AWS experience | 200 - Intermediate |
⏱ Time to complete | 60 minutes |
💰 Cost to complete | Free tier eligible |
🧩 Prerequisites | - AWS account -CDK installed: Visit Get Started with AWS CDK to learn more. |
💻 Code Sample | Code sample used in tutorial on GitHub |
📢 Feedback | Any feedback, issues, or just a 👍 / 👎 ? |
⏰ Last Updated | 2023-07-10 |
- Overview of how to set up the required AWS services - OpenSearch Serverless, MSK Serverless, AWS Cloud9 along with IAM policies and security configurations.
- High-level walk through of the application.
- Get the data ingestion pipeline up and running.
- How to query data in OpenSearch.
Amazon OpenSearch Service supports OpenSearch and legacy Elasticsearch OSS (up to 7.10, the final open source version of the software). When you create a cluster, you have the option of which search engine to use.
You can refer to the details in Comparing OpenSearch Service and OpenSearch Serverless.
EC2
instance:- As the name suggests, the producer sends data to the MSK Serverless cluster.
- The consumer application receives data (
movie
information) from the MSK Serverless topic and uses the OpenSearch Go client to index data in themovies
collection.
- The producer and consumer applications run on the same compute platform (an EC2 instance).
- There is a single consumer application instance processing data from the MSK topic. However, you can try to run multiple instances of the consumer application and see how the data is distributed across the instances.
- Instead of using the Kafka CLI to produce data, a custom producer application was written in Go along with a REST endpoint to send data. This demonstrates how to write a Kafka producer application in Go and mimics the Kafka CLI.
- The volume of data used is small.
- OpenSearch Serverless collection has a Public access type.
- Choose an appropriate compute platform for your consumer application based on data volume and scalability requirements - more on this below.
- Choose VPC access type for your OpenSearch Serverless collection
- Consider using Amazon OpenSearch Ingestion to create your data pipelines.
- Containers - You can package your consumer application as a Docker container (
Dockerfile
is available in the GitHub repository) and deploy it to Amazon EKS or Amazon ECS. - If you deploy the application to Amazon EKS, you can also consider using KEDA to auto-scale your consumer application based on the number of messages in the MSK topic.
- Serverless - It's also possible to use MSK as an event source for AWS Lambda functions. You can write your consumer application as a Lambda function and configure it to be triggered by MSK events or alternatively run it on AWS Fargate.
- Since the producer application is a REST API, you can deploy it to AWS App Runner.
- Finally, you can leverage Amazon EC2 Auto Scaling groups to auto-scale the EC2 fleet for you consumer application.
Let's take a short detour into understanding how this works with Go.
franz-go
Kafka client library supports IAM authentication. Here are snippets from the consumer application that show how it works in practice:- First, the application uses the
ec2rolecreds.New()
credentials provider to retrieve the temporary IAM credentials from the EC2 instance metadata service. The EC2 Instance role should have appropriate IAM role (with permissions) to execute required operations on MSK cluster components (more on this in the subsequent sections). - These credentials are then used to initialize the Kafka client with the
AWS_MSK_IAM
SASL authentication implementation in the sasl_aws package.
Note: Since there are multiple Go clients for Kafka (including Sarama), please make sure to consult their client documentation to confirm whether they support IAM authentication.
- Required IAM roles
- MSK Serverless Cluster
- OpenSearch Serverless collection
- AWS Cloud9 EC2 environment to run your application
Note the for the purposes of this tutorial, we chose Public access type. It's recommended to select VPC for production workloads.
HTTP
endpoint exposed by the application you just started and submit movie data (from movies.txt
file) in JSON
format using curl
:For the purposes of this tutorial and to keep it simple and easy to follow, the amount of data has been purposely restricted to 1500 records and the script intentionally sleeps for 1 second after sending each record to the producer. You should be able to follow along comfortably.
movies
topic, you can start the consumer application start processing data from the MSK Serverless cluster and index it in the OpenSearch Serverless collection.1500
movies indexed in the OpenSearch Serverless collection. You don't have to wait for it to finish though. Once there are a few hundred records, you can go ahead and navigate to Dev Tools in the OpenSearch dashboard to execute the below queries._source
option to retrieve the source from selected fields. For example, to retrieve only the title
, plot
and genres
fields, run the following query:christmas
in the title
field, run the following query:ratings
, genre
, and year
to search results based on the values of those fields. With aggregations, we can answer questions like ‘How many movies are in each genre?“- Also delete IAM roles and policies
Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.