Create an ETL Pipeline with Amazon EMR and Apache Spark
Learn how to build an ETL pipeline for batch processing with Amazon EMR and Apache Spark.
Extract, Transform, and Load (or ETL) - sometimes called Ingest, Transform, and Export - is vital for building a robust data engineering pipeline for any organization. Essentially, if the ETL pipeline is designed and built using the right tools and services, it brings high value to any organization both for batch and real-time processing. But designing and building such a pipeline is a time-consuming task, and it requires different skillsets considering the number of tools and frameworks in this big data space. Luckily, it's pretty easy if you're using EMR and Spark.
Batch ETL is a common use case across many organizations. This tutorial will provide a starting point, which can help you to build more complex data pipelines in AWS using Amazon EMR (Amazon Elastic MapReduce) and Apache Spark. Here's how to do it.
We are going to use PySpark to interact with the Spark cluster. PySpark allows you to write Spark applications using Python APIs.
In this guide, you will:
- Create and set up an Amazon EMR cluster
- Submit a PySpark job on EMR
- Integrate Amazon EMR with Amazon S3
Let's get started!
For this tutorial, let's assume you have a vendor who provides incremental sales data at the end of every month. The file arrives in S3 as a
CSV file and it needs to be processed and made available to your data analysts for querying and analysis.
To implement this data pipeline, we will use an EMR cluster with Spark as the distributed processing engine. We'll use S3 for storing the:
RAWdata (which is the input and unprocessed data)
CLEANSEDdata (which is output and processed data)
We need to build a data pipeline such that it will take this new sales file from the S3 bucket, process it with required transformations using Amazon EMR, and save the cleaned and transformed data into the target S3 bucket, which will be used later on for querying using Amazon Athena.
To implement our data processing pipeline, we need to create an EMR cluster that will run our ETL jobs and an S3 bucket to store the raw and processed data. Then we can start our job on the cluster.
Before we create an EMR cluster we need to create a
Key Pair, which we would need to access the EMR cluster's master node later on. So let's do that really quickly.
- Log in to your AWS account and navigate to the EC2 console and click on the Key Pairs option on the left menu bar. And then, click
Create Key Pair.
- Provide a name (
mykey-emr) for your key pair and click
Create Key Pair.
- Now we can go ahead and create an
Amazon EMR cluster. For that, navigate to Amazon EMR in the console and click Create Cluster to create an EMR cluster.
MyDemoEMRClusterto your EMR cluster, and select the following:
- Select the latest release of EMR under Software configuration section
- Select Spark under Application bundle section,
- Select the right EC2 key pair (which you created in the previous step) under the Security and access section
Keep everything else as default and click on Create cluster. This will create a cluster with three instances.
- Cluster creation will take some time, and after couple of minutes, you will see that the cluster is up and running with a state as
Waiting(which means the cluster is now ready and waiting to execute any ETL job).
Now we will create an Amazon S3 bucket and create two sub-folders within that, which will be used to store
- Navigate to the Amazon S3 console and click on Create Bucket.
- Create a bucket (e.g.
- Once the bucket is created, create two sub-folders named:
- Upload the sales dataset CSV file in the bucket under the folder
Now, that we have the dataset uploaded in S3, it's time to submit the PySpark job from our EMR cluster.
Sign in to the AWS Management Console, and open the Amazon EMR console.
Under EMR on EC2 in the left navigation pane, choose Clusters, and then select the
myDemoEMRClustercluster where you want to retrieve the public DNS name.
Note the Primary node public DNS value in the Summary section of the cluster details page.
- SSH to the EMR cluster's Primary node from your terminal
1ssh -i "mykey-emr.pem" email@example.com
Copy the PySpark code
etl-job.pyand save on the
Primary Nodeunder the home directory and make the following changes and save the file:
PySpark joband wait for the job to complete before proceeding.
1sudo spark-submit etl-job.py
- Once the job completes, check the S3 bucket under the folder
cleaned_data, you will see the new transformed and processed data in parquet format.
cleansed data is available in Amazon S3 in the form of parquet format, but to make it more consumable for data analysts or data scientists, it would be great if we could enable querying the data through SQL by making it available as a database table.
To make that integration, we can follow a two-step approach:
- We need to run the Glue crawler to create an AWS Glue Data Catalog table on top of the S3 data.
- Once that is done, we can run a query in Amazon Athena to validate the output.
- Navigate to the AWS Glue crawler console and click on Create Crawler.
- Give a name for the Glue Crawler (
- Add the data source as S3 bucket where you have your cleansed and processed data (
- Create an IAM role (
AWSGlueServiceRole-default) and attached the same. You can create a role and attach the following policies for more details you can refer to this and follow the steps:
The AWSGlueServiceRole AWS managed policy, which grants the required permissions on the Data Catalog
An inline policy that grants permissions on the data source (
- Create a database by clicking on Add database and select the same from dropdown menu (
- Review and verify all the details and click on Create crawler.
- Once the crawler is created, select the crawler and click on Run.
- Once the crawler finishes its run, you will see
Now that we have the Glue Data Catalog table created, we can navigate to Amazon Athena to query the data using SQL.
Until now, we have extracted the data from Amazon S3, and then transformed the data by converting the data into parquet format using a Glue ETL (pySpark) job. Finally we will use that cleaned data for analysis using Amazon Athena.
- Open Athena query editor. You can keep Data Source as the default
my_demo_dbfor Database (as show in the screen shot) and run the following query.
1SELECT * FROM "my_demo_db"."cleaned_data" limit 10;
- Now you can perform other SQL queries to analyze the data. For example, if we would like to know the
region per segment wise, you can run this:
SUM(forecasted_monthly_revenue) as forecast_monthly_revenue
GROUP BY segment, region;
Now that you’ve finished this walk-through, you can delete all the following resources to avoid incurring unexpected costs:
- Delete the EMR Cluster
- Delete the Amazon S3 bucket
1aws s3 rb s3://<YOUR_BUCKET_LOCATION> --force
- Delete the Glue Database
Congratulations! You have finished the tutorial on creating an ETL pipeline with Amazon EMR and Apache Spark.
In this tutorial, we learned how to build an ETL pipeline, which can be applied in different batch processing use-cases, like e-commerce sales data analysis. We learned how to extract the data from S3 and then transform the data based on our requirement by using a simple Glue ETL (pySpark) job. And then finally, we analyzed the data using SQL via Amazon Athena. If you're interested in learning more about ERM and Spark-based ETL, you may like to check out this workshop.
If you enjoyed this tutorial, found any issues, or have feedback for us, please send it our way!.