Auto-Scaling Amazon Kinesis Data Streams Applications on Kubernetes
Learn how to use to automatically scale data processing workloads deployed to Amazon EKS, based on the shard count of AWS Kinesis Stream.
- A Kinesis data stream is a set of shards. Each shard has a sequence of data records.
- The producers continually push data to Kinesis Data Streams, and the consumers process the data in real time.
- A partition key is used to group data by shard within a stream.
- Kinesis Data Streams segregates the data records belonging to a stream into multiple shards.
- It uses the partition key that is associated with each data record to determine which shard a given data record belongs to.
- Consumers get records from Amazon Kinesis Data Streams, process them and store their results in Amazon DynamoDB, Amazon Redshift, or Amazon S3 etc.
- These consumers are also known as Amazon Kinesis Data Streams Application.
- One of the methods of developing custom consumer applications that can process data from KDS data streams is to use the Kinesis Client Library (
KCL
).
KCL
helps you consume and process data from a Kinesis data stream by taking care of many of the complex tasks associated with distributed computing and scalability. It connects to the data stream, enumerates the shards within the data stream and uses leases to coordinates shard associations with its consumer applications.KCL
pulls data records from the data stream, pushes the records to the corresponding record processor and checkpoints processed records. More importantly, it balances shard-worker associations (leases) when the worker instance count changes or when the data stream is re-sharded (shards are split or merged). This means that you are able to scale your Kinesis Data Streams application by simply adding more instances since KCL
will automatically balance the shards across the instances.KEDA
is a Kubernetes-based event-driven autoscaling component that can monitor event sources like Kinesis and scale the underlying Deployment
s (and Pod
s) based on the number of events needing to be processed.KCL
) 2.x to consume data from a Kinesis Data Stream. It will be deployed to a Kubernetes cluster on Amazon EKS and will be scaled automatically using KEDA
. The application includes an implementation of the ShardRecordProcessor
that processes data from the Kinesis stream and persists it to a DynamoDB
table. We will use the AWS CLI to produce data to the Kinesis stream and observe the scaling of the application.KEDA
.KEDA
is an open-source CNCF project that's built on top of native Kubernetes primitives such as the Horizontal Pod Autoscaler and can be added to any Kubernetes cluster. Here is a high level overview of it's key components (you can refer to the KEDA documentation for a deep-dive):- The
keda-operator-metrics-apiserver
component inKEDA
acts as a Kubernetes metrics server that exposes metrics for the Horizontal Pod Autoscaler - A KEDA Scaler integrates with an external system (such as Redis) to fetch these metrics (e.g. length of a List) to drives auto scaling of any container in Kubernetes based on the number of events needing to be processed.
- The role of the
keda-operator
component is to activate and deactivateDeployment
i.e. scale to and from zero.
eksctl
, can be as easy as this:1
eksctl create cluster --name <cluster name> --region <region e.g. us-east-1>
For details, refer to the Getting started with Amazon EKS – eksctl.
1
2
3
4
5
aws dynamodb create-table \
--table-name users \
--attribute-definitions AttributeName=email,AttributeType=S \
--key-schema AttributeName=email,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
1
aws kinesis create-stream --stream-name kinesis-keda-demo --shard-count 2
1
2
git clone https://github.com/abhirockzz/kinesis-keda-autoscaling
cd kinesis-keda-autoscaling
KEDA
. But you could also use Helm charts.KEDA
:1
2
3
# update version 2.8.2 if required
kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml
1
2
3
4
5
6
7
8
# check Custom Resource Definitions
kubectl get crd
# check KEDA Deployments
kubectl get deployment -n keda
# check KEDA operator logs
kubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsonpath='{.items[0].metadata.name}' -n keda) -n keda
Deployment
s in EKS, we will use IAM Roles for Service Accounts (IRSA) to provide the necessary permissions.KEDA
operator needs to be able to get the shard count for a Kinesis stream - it does so with usingDescribeStreamSummary
API.- The application (KCL library to be specific) needs to interact with Kinesis and DynamoDB - it needs a bunch of IAM permissions to do so.
1
2
3
4
5
6
7
ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
#update the cluster name and region as required
export EKS_CLUSTER_NAME=demo-eks-cluster
export AWS_REGION=us-east-1
OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///")
JSON
file with Trusted Entities for the role:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
read -r -d '' TRUST_RELATIONSHIP <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
"${OIDC_PROVIDER}:sub": "system:serviceaccount:keda:keda-operator"
}
}
}
]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust_keda.json
policy_kinesis_keda.json
file for details):1
2
3
4
5
6
export ROLE_NAME=keda-operator-kinesis-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for kinesis KEDA scaler on EKS"
aws iam create-policy --policy-name keda-kinesis-policy --policy-document file://policy_kinesis_keda.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-kinesis-policy
1
2
3
4
kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
# verify the annotation
kubectl describe serviceaccount/keda-operator -n keda
KEDA
operator Deployment
for this to take effect:1
2
3
4
5
6
7
8
9
10
11
12
kubectl rollout restart deployment.apps/keda-operator -n keda
# to verify, confirm that the KEDA operator has the right environment variables
kubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsonpath={.items..metadata.name}) | grep "^\s*AWS_"
# expected output
AWS_STS_REGIONAL_ENDPOINTS: regional
AWS_DEFAULT_REGION: us-east-1
AWS_REGION: us-east-1
AWS_ROLE_ARN: arn:aws:iam::<AWS_ACCOUNT_ID>:role/keda-operator-kinesis-role
AWS_WEB_IDENTITY_TOKEN_FILE: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
1
2
3
4
5
6
kubectl apply -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
name: kcl-consumer-app-sa
EOF
JSON
file with Trusted Entities for the role:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
read -r -d '' TRUST_RELATIONSHIP <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
"${OIDC_PROVIDER}:sub": "system:serviceaccount:default:kcl-consumer-app-sa"
}
}
}
]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust.json
policy.json
file for details):1
2
3
4
5
6
7
export ROLE_NAME=kcl-consumer-app-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for KCL consumer app on EKS"
aws iam create-policy --policy-name kcl-consumer-app-policy --policy-document file://policy.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/kcl-consumer-app-policy
1
2
3
4
kubectl annotate serviceaccount -n default kcl-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
# verify the annotation
kubectl describe serviceaccount/kcl-consumer-app-sa
Dockerfile
for details).1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# create runnable JAR file
mvn clean compile assembly\:single
# build docker image
docker build -t kcl-consumer-app .
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
# create a private ECR repo
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com
aws ecr create-repository --repository-name kcl-consumer-app --region us-east-1
# tag and push the image
docker tag kcl-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
docker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
consumer.yaml
to include the Docker image you just pushed to ECR. The rest of the manifest remains the same:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: apps/v1
kind: Deployment
metadata:
name: kcl-consumer
spec:
replicas: 1
selector:
matchLabels:
app: kcl-consumer
template:
metadata:
labels:
app: kcl-consumer
spec:
serviceAccountName: kcl-consumer-app-sa
containers:
- name: kcl-consumer
image: AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
imagePullPolicy: Always
env:
- name: STREAM_NAME
value: kinesis-keda-demo
- name: TABLE_NAME
value: users
- name: APPLICATION_NAME
value: kinesis-keda-demo
- name: AWS_REGION
value: us-east-1
- name: INSTANCE_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
Deployment
:1
2
3
4
kubectl apply -f consumer.yaml
# verify Pod transition to Running state
kubectl get pods -w
KCL
library should jump into action. The first thing it will do is create a "control table" in DynamoDB - this should be the same as name of the KCL application (which in this case is kinesis-keda-demo
).1
kubectl logs -f $(kubectl get po -l=app=kcl-consumer --output=jsonpath={.items..metadata.name})
leaseOwner
attribute:1
2
aws dynamodb describe-table --table-name kinesis-keda-demo
aws dynamodb scan --table-name kinesis-keda-demo
1
2
3
4
5
6
export KINESIS_STREAM=kinesis-keda-demo
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user1@foo.com --data $(echo -n '{"name":"user1", "city":"new york"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user2@foo.com --data $(echo -n '{"name":"user2", "city":"tel aviv"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user3@foo.com --data $(echo -n '{"name":"user3", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user4@foo.com --data $(echo -n '{"name":"user4", "city":"seattle"}' | base64)
DynamoDB
table (which is named users
in this case). You can check the table to verify the records.1
aws dynamodb scan --table-name users
processed_by
attribute? It's the same as KCL consumer Pod
. This will make it easier for us to verify the end to end autoscaling process.ScaledObject
definition. Notice that it's targeting the kcl-consumer
Deployment
(the one we just created) and the shardCount
is set to 1
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: aws-kinesis-stream-scaledobject
spec:
scaleTargetRef:
name: kcl-consumer
triggers:
- type: aws-kinesis-stream
metadata:
# Required
streamName: kinesis-keda-demo
# Required
awsRegion: "us-east-1"
shardCount: "1"
identityOwner: "operator"
KEDA
Kinesis scaler:1
kubectl apply -f keda-kinesis-scaler.yaml
Pod
of our KCL application. But, thanks to KEDA
, we should now see the second Pod
coming up.1
2
3
4
kubectl get pods -l=app=kcl-consumer -w
# check logs of the new pod
kubectl logs -f <enter Pod name>
shardCount: "1"
in the ScaledObject
definition. This means that there will be one Pod
for per shard in the Kinesis stream.kinesis-keda-demo
control table in DynamoDB
- You should see update for the leaseOwner
.1
2
3
4
5
6
export KINESIS_STREAM=kinesis-keda-demo
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user5@foo.com --data $(echo -n '{"name":"user5", "city":"new york"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user6@foo.com --data $(echo -n '{"name":"user6", "city":"tel aviv"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user7@foo.com --data $(echo -n '{"name":"user7", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user8@foo.com --data $(echo -n '{"name":"user8", "city":"seattle"}' | base64)
processed_by
attribute. Since we have scaled out to two Pod
s, the value should be different for each record since each Pod
will process a subset of the records from the Kinesis stream.KCL
application auto-scaling.1
aws kinesis update-shard-count --stream-name kinesis-keda-demo --target-shard-count 3 --scaling-type UNIFORM_SCALING
KEDA
scaler will spring into action and scale out the KCL application to three Pod
s.1
kubectl get pods -l=app=kcl-consumer -w
kinesis-keda-demo
control table in DynamoDB
- check the leaseOwner
attribute.Pod
s will share the record processing and this will reflect in the processed_by
attribute in the users
table.1
2
3
4
5
6
7
8
9
export KINESIS_STREAM=kinesis-keda-demo
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user9@foo.com --data $(echo -n '{"name":"user9", "city":"new york"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user10@foo.com --data $(echo -n '{"name":"user10", "city":"tel aviv"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user11@foo.com --data $(echo -n '{"name":"user11", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user12@foo.com --data $(echo -n '{"name":"user12", "city":"seattle"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user14@foo.com --data $(echo -n '{"name":"user14", "city":"tel aviv"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user15@foo.com --data $(echo -n '{"name":"user15", "city":"new delhi"}' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key user16@foo.com --data $(echo -n '{"name":"user16", "city":"seattle"}' | base64)
Scale down - So far, we have only scaled in one direction. What happens when we reduce the shard capacity of the Kinesis stream? Try this out for yourself - reduce the shard count from three to two and see what happens to the KCL application.
1
2
3
eksctl delete cluster --name keda-kinesis-demo
aws kinesis delete-stream --stream-name kinesis-keda-demo
aws dynamodb delete-table --table-name users
KEDA
to auto-scale a KCL
application that consumes data from a Kinesis stream.shardCount
to 3
and have one Pod
for every three shards in your Kinesis stream. However, if you want to maintain a one to one mapping, you can set the shardCount
to 1
and KCL
will take care of distributed co-ordination and lease assignment, thereby ensuring that each Pod
has one instance of the record processor. This is an effective approach that allows you to scale out your Kinesis stream processing pipeline to meet the demands of your applications.Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.