Build a Real-Time Streaming Analytics Application on Apache Kafka
Learn how to build an end-to-end real-time streaming analytics application on AWS using Apache Kafka and Apache Flink
Step 2: Build the Flink Application
Step 3: Upload the File to Amazon S3
Step 4: Create a Stack using AWS CloudFormation
Step 5: Create the MSK Serverless Cluster
Step 6: Create the Kafka Topics
Step 7: Start a Container Application to Generate Clickstream Data
Step 8: Check Schema in AWS Glue Schema Registry
Step 9: Consume Clickstream Data Using Managed Service for Apache Flink
Step 10: View Clickstream Data in the Amazon OpenSearch Dashboard
However, building such an end-to-end real-time streaming application with an Apache Kafka producer and Kafka consumer can be quite challenging.
About | |
---|---|
✅ AWS Level | Intermediate - 200 |
⏱ Time to complete | 45 mins - 60 mins |
💰 Cost to complete | USD 4.00 |
🧩 Prerequisites | - An AWS Account - An IAM user that has the access to create AWS resources - Basic understanding of CLI - Java and Apache Maven installed |
💻 Code | Code sample used in tutorial on GitHub |
📢 Feedback | Any feedback, issues, or just a 👍 / 👎 ? |
⏰ Last Updated | 2023-06-20 |
- Start a Serverless Amazon MSK Cluster
- Produce streaming data to MSK Serverless using Kafka Client Container
- Consume and process the streaming data using Amazon Managed Service for Apache Flink (MFA)
- Visualise streaming data in Amazon OpenSearch Service
data:image/s3,"s3://crabby-images/3e01a/3e01a3a62cad92fa5b6db917155ca14ba5402a6f" alt="Overview of the proposed architecture with the featured AWS services"
- Java JDK
- Apache Maven
build-on-aws/real-time-streaming-analytics-application-using-apache-kafka
contains the required files that help us to get started. Run the following command to download the repository to your local machine:flink-clickstream-consumer
folder using the following command inside the previously downloaded repository:flink-clickstream-consumer
folder by running:data:image/s3,"s3://crabby-images/8e924/8e9240ab688b872ab6d2446c9702753272ee21b9" alt="'Build Success' message after running mvn package"
flink-clickstream-consumer/target/
named ClickStreamProcessor-1.0.jar
. If you want to better understand the inner workings of the Flink application, you can have a look at the ClickstreamProcessor.java
file in the src
directory. This is the entry point of the Java application where the main
function resides.- Log into your AWS account, navigate to the Amazon S3 console, and click
Create bucket
.
data:image/s3,"s3://crabby-images/1d30a/1d30aa7900f3ccc5c38e9687c0846de4e8973746" alt="'Create Bucket' button in the S3 console."
- Provide a unique bucket name of your choice and choose an AWS region (e.g.
us-east-1
) and clickCreate Bucket
at the bottom of the page. Take note of your bucket name.
data:image/s3,"s3://crabby-images/9c576/9c576174618a0d99e158a8839acb7e7a5b755918" alt="'Create Bucket' page where to specify a bucket name"
- Click on the newly created bucket and click
Upload
to upload the following file to the S3 bucket.
data:image/s3,"s3://crabby-images/ec198/ec1984d19c9dd890ac6f9085ae8e02fabb073f1e" alt="'Upload' button within the bucket objects view"
- Click
Add files
and select the JAR fileClickStreamProcessor-1.0.jar
that you have recently generated.
data:image/s3,"s3://crabby-images/8d99e/8d99ef5cd2b32f3ffe046dd2832787a3502662b3" alt="'Add files' button to select and upload local files to the S3 bucket"
Amazon OpenSearch Cluster
: This is where we can visualize the consumed clickstream data. It is deployed in private subnets of a VPC.Amazon ECS Cluster + Task definition
: The container application that generates the sample clickstream data runs inside the ECS cluster as a Fargate task.Amazon Managed Service for Apache Flink
: This is where the Flink application runs, consuming the clickstream data from the MSK cluster, processing it and writing it to the OpenSearch Service.Amazon EC2 Instance (Kafka client)
: This EC2 instance serves as a Kafka client and allows us to interact with the MSK cluster by among other things creating Kafka topics.Amazon EC2 Instance (Nginx proxy)
: This EC2 instance serves as a Nginx proxy and allows us to access the OpenSearch Dashboard from outside of the VPC, i.e., from the Internet.Security groups
: Security groups help us to control the traffic that is allowed to reach and leave a particular resource.IAM roles
: An IAM role is an IAM identity that has specific permissions attached to it and can be assumed by an IAM user or an AWS service. For example, an IAM role can be used to grant permissions to an application running on an EC2 instance that requires access to a specific Amazon S3 bucket.
- Navigate to the CloudFormation console and click on
Create Stack
. - Choose
Upload a template file
and clickChoose file
to upload the CloudFormation template filecf_template.yml
that can be found in the root directory of the downloaded repository. Then, clickNext
.
data:image/s3,"s3://crabby-images/98d91/98d910ce7f91483aa0a2a2d297ef88399c93655a" alt="'Create stack' view inside the CloudFormation console"
- Provide the stack with a
Stack name
of your choice (e.g.msk-serverless-stack
). Additionally, you have to provide a value to the parameterAssetsBucketName
. Enter the name of the S3 bucket that you created earlier. You can leave the defaultClickstreamProcessor-1.0.jar
asKdaAppKey
unless you have changed the name of the JAR file that you have generated earlier. Leave theLatestAmiId
as well as theOpenSearchmasterUserName
as is. ClickNext
.
data:image/s3,"s3://crabby-images/5690f/5690f38cda531032cc4962653ab1cefa280505af" alt="'Specify stack details' view within the CloudFormation console"
- Scroll down the page
Configure stack options
and clickNext
. - Scroll down the page
Review <Your_Stack_Name>
. Make sure to tick the box that reads,I acknowledge that AWS CloudFormation might create IAM resources with custom names
. Lastly, clickSubmit
to create the CloudFormation stack.
CREATE_IN_PROGRESS
to CREATE_COMPLETE
. Note: This can take some time.data:image/s3,"s3://crabby-images/c4b9d/c4b9d39a4e54ec986e78f7501ce7bddc0cd08b36" alt="'Stacks' view inside the CloudFormation console to view the status of the stacks"
CREATE_COMPLETE
, the resources that were defined in the CloudFormation template have been created in your AWS account. However, there are few more resources and configurations required until we end up with an end-to-end real-time streaming application.- Navigate to the Amazon MSK console and click
Create cluster
. - Choose
Custom create
and provide a cluster name of your choice (e.g.msk-cluster
). SelectServerless
as cluster type. Then, clickNext
.
data:image/s3,"s3://crabby-images/81e8b/81e8b4e7e001d9765a9c0cab7bbc19dcddcf3ace" alt="'Cluster Settings' as part of 'Create Cluster' view within the Amazon MSK console"
- In the
Networking
view, select the custom VPC namedMMVPC
. Then, clickAdd subnet
to add a third subnet and choose the three available private subnets (PrivateSubnetMSKOne
,PrivateSubnetMSKTwo
,PrivateSubnetMSKThree
) for the the different zones inus-east-1a
,us-east-1b
andus-east-1c
. - Rather than the default security group, select the security group named
MSK Security Group
. Lastly, clickNext
.
data:image/s3,"s3://crabby-images/896cb/896cb66591cfbf7c36d7812eb2571813a395616b" alt="'Networking Settings' as part of 'Create Cluster' view within the Amazon MSK console"
- Click
Next
. - Click
Next
. - Finally, click
Create cluster
to create the MSK Serverless cluster. - Once your MSK Serverless cluster status changes to
Active
, click onView client information
.
data:image/s3,"s3://crabby-images/b8cf9/b8cf98bc4385edc449d9b1405b3605dd9041be76" alt="'Cluster summary' view of the created MSK cluster within the MSK console"
- Take note of the endpoint of your MSK Serverless cluster. Notice that we use IAM access control to handle the authentication to the MSK cluster.
data:image/s3,"s3://crabby-images/729eb/729eb0e3b1f0e83e8624efeb506df7329c893c99" alt="View client information of the created MSK cluster"
- Navigate to the Amazon EC2 console. On the EC2 home page click on
Instances (running)
.
data:image/s3,"s3://crabby-images/7f84b/7f84b1f47bc0d5e25298587e133ef80da9ca636f" alt="Home page of the EC2 console"
- On the EC2 Instances page select the checkbox for the instance named
KafkaClientInstance
and click on theConnect
button on top right as shown in the image below.
data:image/s3,"s3://crabby-images/fdb0a/fdb0a5794738739f95ad2ca7e9beeae15f032bb0" alt="'Connect' button within the EC2 instances view"
- On the page
Connect to instance
, ensure to selectSession Manager
and click theConnect button
. This opens a new tab with an EC2 terminal window. - In the terminal window execute the following command to change to
ec2-user
:
data:image/s3,"s3://crabby-images/37749/37749e86dabab6ea747627134bc78e238b09f118" alt="Terminal window view when running the sudo command"
- Execute the command below to set your MSK cluster endpoint to the shell variable BS. Please replace
<Your_Cluster_Endpoint>
with the endpoint you noted down after you created the MSK Serverless cluster.
data:image/s3,"s3://crabby-images/5c8bc/5c8bc82d7e39b7e31717860b05fd30955c1deb50" alt="Terminal window view when running the export command"
- Then, execute the following command to create the Kafka topic.
data:image/s3,"s3://crabby-images/a3e4d/a3e4dc676adf1d4ab943690d38af5191fa02f54a" alt="Terminal window view when running the create topics command"
- You should see a single MSK topic that has been created:
clickstream
.
clickstream
. For that we'll deploy a serverless Amazon ECS Fargate container which runs an application, generating sample clickstream data to the MSK Serverless cluster.- Navigate to the Amazon ECS console. On the left side menu click on
Task Definitions
to view all available Task definitions. Select the checkbox of the available Task definition and selectRun task
from theDeploy
menu.
data:image/s3,"s3://crabby-images/c9480/c9480261771546ac5355b28cdd07aaf0f2a86737" alt="'Run task' option within the Task definitions in the ECS console"
- On the Run Task page, select the existing cluster (
msk-serverless-[...]-cluster
) and leave the default settings.
data:image/s3,"s3://crabby-images/6021a/6021a55433e787c8eda7211718e25291d3d1d96f" alt="'Run Task' page view within the ECS console"
- Expand the
Networking
section. Change the default VPC toMMVPC
. Similar to before, select the three subnetsPrivateSubnetMSKOne
,PrivateSubnetMSKTwo
andPrivateSubnetMSKThree
. Finally, uncheck the default security group and select the security group that contains-ProducerECSTaskSecurityGroup-
.
data:image/s3,"s3://crabby-images/a804b/a804b6ded827fc831646792c2206a9a7478d2bfe" alt="Networking settings within the 'Run Task' view"
- Expand the
Container overrides
section. ForBOOTSTRAP_STRING
enter the value to your MSK Serverless cluster endpoint (written down earlier withView client information
from the MSK cluster console page).
data:image/s3,"s3://crabby-images/60167/60167ba404205d6dbbcae5481478c8c898da7bd4" alt="Container overrides within the 'Run Task' view"
- Finally, click the
Create button
. - Wait for your task to change to the
Running
status as shown below.
data:image/s3,"s3://crabby-images/57908/57908fed0a653ed2f1b3be4de8907045c92e4128" alt="'Tasks' view of the created ECS cluster"
clickstream
.- Navigate to the Amazon Glue console. Select
Stream schema registries
underData Catalog
from the left menu. You can see the schema registry namedserverless
. Click on it. - You can see the available schemas of the schema registry
serverless
. Click on the schemaclickstream
to see the different schema versions. You should see version1
here.
data:image/s3,"s3://crabby-images/03439/03439ccda52e12f99af626bbc9320f365478a16c" alt="'Schema properties' view including 'Schema versions' within the Glue console"
- Click on the version
1
to see the Avro schema of the clickstream data produced by the ECS task.
data:image/s3,"s3://crabby-images/d536c/d536c426902e09ad68d0ac7e9e1a7c9d05a40b25" alt="Schema version definition of the click event"
- Navigate to the Managed Apache Flink console and click on the open streaming application
KDAFlinkCLickstream-msk-serverless-stack
. - Configure and update the application by clicking on the
Configure
button.
data:image/s3,"s3://crabby-images/f2080/f20809b2ac62a70f72e815a42edd5288f272a163" alt="Flink application view within the Managed Apache Flink console"
- Scroll down to the
Runtime properties
. UpdateBootstrapServers
to the MSK Serverless cluster endpoint you have written down earlier. Keep the rest of the values as default.
data:image/s3,"s3://crabby-images/36f1a/36f1ace3bc9ba62a3c45bc2bafde364d80b75588" alt="Runtime properties of the Flink application within the Managed Apache Flink console"
- Finally, save your changes.
- Click on the
Run
button to run the Flink application. ChooseRun without snapshot
.
data:image/s3,"s3://crabby-images/2ce7d/2ce7dca591052a940fa931f0be44743f219ed6ce" alt="KDA Run Application"
- Once the Flink application is running, click on
Open Apache Flink dashboard
to open the Flink dashboard.
data:image/s3,"s3://crabby-images/037ad/037ad062d7108879d4935db0f77990f3ae97d3e3" alt="'Open Apache Flink dashboard' button in the Flink application"
- Click on
Running Jobs
on the left side of the menu. Click onFlink Streaming Job
to access the details of the running job.
data:image/s3,"s3://crabby-images/4cf7e/4cf7ec1ff4b03519f2e8387f38cc0626298c1b10" alt="Run application view with the 'Application restore configuration' options"
- This opens a screen with a directed acyclic graph (DAG), representing the flow of data throughout each of the operators of your application. Each blue box in the job workflow represents a series of chained operators, known as Tasks in Apache Flink.
EventTimeSessionWindows
to extract user sessions from the clickstream data by grouping events that are within a specified time gap of each other. Then, the application deploys TumblingEventTimeWindows
to calculate specific aggregation characteristics within a certain period of time by dividing the clickstream in fixed-size, non-overlapping windows. For example, that could involve calculating the aggregate count of user sessions that have made a purchase within the past 10 seconds.Bytes Received
, Bytes Sent
, Records Received
and Records Sent
at the bottom of the screen. Note that Flink can only measure the bytes sent or received between operators. That’s why you can't see the metrics for the source or sink operator as the data is coming from outside of Flink.data:image/s3,"s3://crabby-images/861d4/861d4f1bd9d46b58063dcc1a452338ea7f8634b4" alt="DAG to show the flow of data and data statistics"
- Navigate to the CloudFormation console and click on the stack that we created earlier. Go to the
Outputs
tab of the stack. - Take note of the
OpenSearchMasterUserName
andOpenSearchMasterPassword
. We will need the values in the next step.
data:image/s3,"s3://crabby-images/2595d/2595d3e59886261de42b525bde7c4f6927222efe" alt="'Outputs view' of the created stack in the CloudFormation console"
- Click on the
OpenSearchDashboardEndpoint
to open the OpenSearch Dashboard login page in a new tab. As the OpenSearch Service is deployed in a VPC, we are relying on Nginx reverse proxy to access the OpenSearch Dashboard outside of the VPC. Note that we are using a self signed certificate for Nginx. However, we strongly recommend using a valid certificate for production deployments. - If you are accessing the URL using Google Chrome, click on the
Advanced
button and click onProceed to <Your_EC2_DNS>
.
data:image/s3,"s3://crabby-images/7a6f2/7a6f29c31b967a1ed4ad55d76dab04513c60072e" alt="Warning displayed in Chrome"
- Use the
OpenSearchMasterUserName
andOpenSearchMasterPassword
from the previous step and log into to OpenSearch Dashboards.
data:image/s3,"s3://crabby-images/edb9f/edb9f3abab6a28cc3b801cb19833b1d38b2882bd" alt="Login screen of OpenSearch Dashboard"
- Select
Global
tenant on the popup dialog box. - Click on the hamburger menu on the left of the screen and click on
Dashboards
as shown below.
data:image/s3,"s3://crabby-images/46177/46177381fecc14121f44e5be3d2c18f58c165a01" alt="OpenSeach Dashboard with open menu on the left"
- In the
Dashboards
view select the dashboard namedClickstream Dashboard
to see the plotted data:
data:image/s3,"s3://crabby-images/6969f/6969fe6e400e06ae32c6f77170ed34692131adc5" alt="Graphs within OpenSearch Dashboard"
AmazonOpenSearchSink.java
in the downloaded repository to view the implementation of the connector.- Delete the MSK Serverless Cluster under the
Actions
menu.
data:image/s3,"s3://crabby-images/6a7d9/6a7d9d3231716debf029e0a392f5e314a80140aa" alt="'Delete' under 'Actions' in the MSK console"
- Delete the CloudFormation Stack.
data:image/s3,"s3://crabby-images/a7e33/a7e33ab97328667fe744ee3df16ec8c0bfba8b0a" alt="'Delete' button within the 'Stacks' view of the CloudFormation console"
- Empty and delete the S3 Bucket you have created earlier.
data:image/s3,"s3://crabby-images/cd981/cd9810ecd8d69b6b770577e2bbc9215a4d003fc0" alt="'Delete' button within the S3 console"
Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.