Build a Real-Time Streaming Analytics Application on Apache Kafka
Learn how to build an end-to-end real-time streaming analytics application on AWS using Apache Kafka and Apache Flink
Step 2: Build the Flink Application
Step 3: Upload the File to Amazon S3
Step 4: Create a Stack using AWS CloudFormation
Step 5: Create the MSK Serverless Cluster
Step 6: Create the Kafka Topics
Step 7: Start a Container Application to Generate Clickstream Data
Step 8: Check Schema in AWS Glue Schema Registry
Step 9: Consume Clickstream Data Using Managed Service for Apache Flink
Step 10: View Clickstream Data in the Amazon OpenSearch Dashboard
However, building such an end-to-end real-time streaming application with an Apache Kafka producer and Kafka consumer can be quite challenging.
About | |
---|---|
✅ AWS Level | Intermediate - 200 |
⏱ Time to complete | 45 mins - 60 mins |
💰 Cost to complete | USD 4.00 |
🧩 Prerequisites | - An AWS Account - An IAM user that has the access to create AWS resources - Basic understanding of CLI - Java and Apache Maven installed |
💻 Code | Code sample used in tutorial on GitHub |
📢 Feedback | Any feedback, issues, or just a 👍 / 👎 ? |
⏰ Last Updated | 2023-06-20 |
- Start a Serverless Amazon MSK Cluster
- Produce streaming data to MSK Serverless using Kafka Client Container
- Consume and process the streaming data using Amazon Managed Service for Apache Flink (MFA)
- Visualise streaming data in Amazon OpenSearch Service

- Java JDK
- Apache Maven
build-on-aws/real-time-streaming-analytics-application-using-apache-kafka
contains the required files that help us to get started. Run the following command to download the repository to your local machine:flink-clickstream-consumer
folder using the following command inside the previously downloaded repository:flink-clickstream-consumer
folder by running:
flink-clickstream-consumer/target/
named ClickStreamProcessor-1.0.jar
. If you want to better understand the inner workings of the Flink application, you can have a look at the ClickstreamProcessor.java
file in the src
directory. This is the entry point of the Java application where the main
function resides.- Log into your AWS account, navigate to the Amazon S3 console, and click
Create bucket
.

- Provide a unique bucket name of your choice and choose an AWS region (e.g.
us-east-1
) and clickCreate Bucket
at the bottom of the page. Take note of your bucket name.

- Click on the newly created bucket and click
Upload
to upload the following file to the S3 bucket.

- Click
Add files
and select the JAR fileClickStreamProcessor-1.0.jar
that you have recently generated.

Amazon OpenSearch Cluster
: This is where we can visualize the consumed clickstream data. It is deployed in private subnets of a VPC.Amazon ECS Cluster + Task definition
: The container application that generates the sample clickstream data runs inside the ECS cluster as a Fargate task.Amazon Managed Service for Apache Flink
: This is where the Flink application runs, consuming the clickstream data from the MSK cluster, processing it and writing it to the OpenSearch Service.Amazon EC2 Instance (Kafka client)
: This EC2 instance serves as a Kafka client and allows us to interact with the MSK cluster by among other things creating Kafka topics.Amazon EC2 Instance (Nginx proxy)
: This EC2 instance serves as a Nginx proxy and allows us to access the OpenSearch Dashboard from outside of the VPC, i.e., from the Internet.Security groups
: Security groups help us to control the traffic that is allowed to reach and leave a particular resource.IAM roles
: An IAM role is an IAM identity that has specific permissions attached to it and can be assumed by an IAM user or an AWS service. For example, an IAM role can be used to grant permissions to an application running on an EC2 instance that requires access to a specific Amazon S3 bucket.
- Navigate to the CloudFormation console and click on
Create Stack
. - Choose
Upload a template file
and clickChoose file
to upload the CloudFormation template filecf_template.yml
that can be found in the root directory of the downloaded repository. Then, clickNext
.

- Provide the stack with a
Stack name
of your choice (e.g.msk-serverless-stack
). Additionally, you have to provide a value to the parameterAssetsBucketName
. Enter the name of the S3 bucket that you created earlier. You can leave the defaultClickstreamProcessor-1.0.jar
asKdaAppKey
unless you have changed the name of the JAR file that you have generated earlier. Leave theLatestAmiId
as well as theOpenSearchmasterUserName
as is. ClickNext
.

- Scroll down the page
Configure stack options
and clickNext
. - Scroll down the page
Review <Your_Stack_Name>
. Make sure to tick the box that reads,I acknowledge that AWS CloudFormation might create IAM resources with custom names
. Lastly, clickSubmit
to create the CloudFormation stack.
CREATE_IN_PROGRESS
to CREATE_COMPLETE
. Note: This can take some time.
CREATE_COMPLETE
, the resources that were defined in the CloudFormation template have been created in your AWS account. However, there are few more resources and configurations required until we end up with an end-to-end real-time streaming application.- Navigate to the Amazon MSK console and click
Create cluster
. - Choose
Custom create
and provide a cluster name of your choice (e.g.msk-cluster
). SelectServerless
as cluster type. Then, clickNext
.

- In the
Networking
view, select the custom VPC namedMMVPC
. Then, clickAdd subnet
to add a third subnet and choose the three available private subnets (PrivateSubnetMSKOne
,PrivateSubnetMSKTwo
,PrivateSubnetMSKThree
) for the the different zones inus-east-1a
,us-east-1b
andus-east-1c
. - Rather than the default security group, select the security group named
MSK Security Group
. Lastly, clickNext
.

- Click
Next
. - Click
Next
. - Finally, click
Create cluster
to create the MSK Serverless cluster. - Once your MSK Serverless cluster status changes to
Active
, click onView client information
.

- Take note of the endpoint of your MSK Serverless cluster. Notice that we use IAM access control to handle the authentication to the MSK cluster.

- Navigate to the Amazon EC2 console. On the EC2 home page click on
Instances (running)
.

- On the EC2 Instances page select the checkbox for the instance named
KafkaClientInstance
and click on theConnect
button on top right as shown in the image below.

- On the page
Connect to instance
, ensure to selectSession Manager
and click theConnect button
. This opens a new tab with an EC2 terminal window. - In the terminal window execute the following command to change to
ec2-user
:

- Execute the command below to set your MSK cluster endpoint to the shell variable BS. Please replace
<Your_Cluster_Endpoint>
with the endpoint you noted down after you created the MSK Serverless cluster.

- Then, execute the following command to create the Kafka topic.

- You should see a single MSK topic that has been created:
clickstream
.
clickstream
. For that we'll deploy a serverless Amazon ECS Fargate container which runs an application, generating sample clickstream data to the MSK Serverless cluster.- Navigate to the Amazon ECS console. On the left side menu click on
Task Definitions
to view all available Task definitions. Select the checkbox of the available Task definition and selectRun task
from theDeploy
menu.

- On the Run Task page, select the existing cluster (
msk-serverless-[...]-cluster
) and leave the default settings.

- Expand the
Networking
section. Change the default VPC toMMVPC
. Similar to before, select the three subnetsPrivateSubnetMSKOne
,PrivateSubnetMSKTwo
andPrivateSubnetMSKThree
. Finally, uncheck the default security group and select the security group that contains-ProducerECSTaskSecurityGroup-
.

- Expand the
Container overrides
section. ForBOOTSTRAP_STRING
enter the value to your MSK Serverless cluster endpoint (written down earlier withView client information
from the MSK cluster console page).

- Finally, click the
Create button
. - Wait for your task to change to the
Running
status as shown below.

clickstream
.- Navigate to the Amazon Glue console. Select
Stream schema registries
underData Catalog
from the left menu. You can see the schema registry namedserverless
. Click on it. - You can see the available schemas of the schema registry
serverless
. Click on the schemaclickstream
to see the different schema versions. You should see version1
here.

- Click on the version
1
to see the Avro schema of the clickstream data produced by the ECS task.

- Navigate to the Managed Apache Flink console and click on the open streaming application
KDAFlinkCLickstream-msk-serverless-stack
. - Configure and update the application by clicking on the
Configure
button.

- Scroll down to the
Runtime properties
. UpdateBootstrapServers
to the MSK Serverless cluster endpoint you have written down earlier. Keep the rest of the values as default.

- Finally, save your changes.
- Click on the
Run
button to run the Flink application. ChooseRun without snapshot
.

- Once the Flink application is running, click on
Open Apache Flink dashboard
to open the Flink dashboard.

- Click on
Running Jobs
on the left side of the menu. Click onFlink Streaming Job
to access the details of the running job.

- This opens a screen with a directed acyclic graph (DAG), representing the flow of data throughout each of the operators of your application. Each blue box in the job workflow represents a series of chained operators, known as Tasks in Apache Flink.
EventTimeSessionWindows
to extract user sessions from the clickstream data by grouping events that are within a specified time gap of each other. Then, the application deploys TumblingEventTimeWindows
to calculate specific aggregation characteristics within a certain period of time by dividing the clickstream in fixed-size, non-overlapping windows. For example, that could involve calculating the aggregate count of user sessions that have made a purchase within the past 10 seconds.Bytes Received
, Bytes Sent
, Records Received
and Records Sent
at the bottom of the screen. Note that Flink can only measure the bytes sent or received between operators. That’s why you can't see the metrics for the source or sink operator as the data is coming from outside of Flink.
- Navigate to the CloudFormation console and click on the stack that we created earlier. Go to the
Outputs
tab of the stack. - Take note of the
OpenSearchMasterUserName
andOpenSearchMasterPassword
. We will need the values in the next step.

- Click on the
OpenSearchDashboardEndpoint
to open the OpenSearch Dashboard login page in a new tab. As the OpenSearch Service is deployed in a VPC, we are relying on Nginx reverse proxy to access the OpenSearch Dashboard outside of the VPC. Note that we are using a self signed certificate for Nginx. However, we strongly recommend using a valid certificate for production deployments. - If you are accessing the URL using Google Chrome, click on the
Advanced
button and click onProceed to <Your_EC2_DNS>
.

- Use the
OpenSearchMasterUserName
andOpenSearchMasterPassword
from the previous step and log into to OpenSearch Dashboards.

- Select
Global
tenant on the popup dialog box. - Click on the hamburger menu on the left of the screen and click on
Dashboards
as shown below.

- In the
Dashboards
view select the dashboard namedClickstream Dashboard
to see the plotted data:

AmazonOpenSearchSink.java
in the downloaded repository to view the implementation of the connector.- Delete the MSK Serverless Cluster under the
Actions
menu.

- Delete the CloudFormation Stack.

- Empty and delete the S3 Bucket you have created earlier.

Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.