logo
Menu

Automating and orchestrating data engineering workflows in MWAA - Part 2

Second of a two part blog to illustrate of data engineering workflow using MWAA (with ECS) to execute SQL queries in Redshift

Published Aug 31, 2024
Following the setup and deployment of the required resources in part 1, in this part of the blog we will focus on execution of the MWAA dag and understand how to troubleshoot.

MWAA UI

In the MWAA console, you should see the Airflow Environment section with Environment status as Available if deployment was successful. Click the Open Airflow UI link under the Airflow UI header.
The “PUBLIC_ONLY” option for the webserver access makes it easy to access the UI as long as the user accessing has the necessary IAM permissions. The underlying mechanism of authentication is utilizing AWS SSO. A more secure way of restricting access to users in tightly controlled organisations would be to use the “PRIVATE_ONLY” option for accessing the webserver as described in the AWS docs Apache Airflow access modes - Amazon Managed Workflows for Apache Airflow
In the dags tab, you will see the list of Dags and the example_dag_redshift which was deployed from the dag folder in S3. Click the run button to execute the dag.
The first task executes a copy command to copy data from S3 into redshift table using the Python Operator and print out the contents in the table in the logs. The dag script dags/dags_redshift imports a python callable from utils/execute_redshift_query.py relative to the dags script in the S3 bucket. The python callable is passed as a value to the python callable arg to the Python Operator.
This function uses the python redshift connector library to connect to redshift programatically. This is installed as part of the dag requirements file. Alternatively, there are other ways to setup a database connection from the Airflow UI which is covered in the latter section. The python redshift connector library provides a number of methods for connecting to redshift. These are demonstrated in the AWS repository notebook example. Here we will use IAM default profile to use IAM credentials. Further configuration options for the connector can be found here. The connection's autocommit property is set to True to automatically commit the transactions to table . This needs to be performed after a rollback command to make sure an existing command is not in progress.
The host, database, port and cluster_identifier arguments which need to setup to redshift_connector.connect(...) are retrieved from Systems Manager Parameter Store which should already be setup as part of the terraform deployment.
The csv data in s3 uri is copied into public.housing by executing the REDSHIFT COPY command from s3 uri and passing the IAM role arn and region. The schema, table and s3_uri are parameterised, and the respective values are fetched from Systems Manager Parameter Store. All the data from the table is then fetched by executing a SELECT command and printed out as pandas dataframe.
The next task uses the ECS Operator, to run a containerised task in ECS to access the table via Redshift spectrum to perform a create a view which contains filtered data. In Part1, the ECS cluster and task definition with the reference to ECR image uri which was pushed to ECR repo has already been covered. In the ECSRunTaskOperator, we need to pass in the ECS cluster name, task definiton arn and launch type details (FARGATE). These are fetched from AWS Systems Manager Parameter Store https://github.com/ryankarlos/mwaa_example/blob/master/dags/dag_redshift.py#L17-L20
The execution logs from both tasks can be visible in the airflow task logs. The screenshot below shows the copy_s3_to_redshift task described earlier. The COPY command with the values populated can be seen in the logs along with the table queried and printed as pandas dataframe.
During the process have a look at the ECS cluster and notice the ECS task being provisioned while your Airflow task is running in parallel (select running in the dropdown to filter the tasks whose desired status is Running).
Once the code e, their desired status changes to stopped and they will deprovision and eventually have a status of stopped and the container exits. The ECS task lifecycle is depicted in the flow chart in the docs
After the task is concluded check the stopped tasks and open the logs via the ECS console.
The messages printed by the container were transferred to the output of the ECS task. If you click in the View in Cloudwatch option, you should see a new logstream created.
The same logs are also streamed to the Airflow task. If you navigate to the Airflow UI, and click on the run_ecs_task in the dag and select the logs tab, you should see the same logs visible in the output as in the screenshot below.
Finally, we can monitor the query metrics in Redshift dashboard by navigating to the Query Monitoring in the Clusters tab in the navigation bar as detailed in the docs . An example of query duration over time for the redshift cluster is shown in the graph below.
 You should see the example_dag_redshift dag to open up a new window. In this dashboard you should be able to view the task graph, the DAG code and the past execution details (successes and failures).

Database connection from Airflow UI

We can also alternatively setup database connections directly in the UI and reference the connection ID in the dag code. In this case, the connection details will be stored in an Airflow metadata database. In the Airflow UI, navigate to Admin → Connections and select redshift_default.
In the dag code, the redshift connection id redshift_default will then need to be referenced in the default_args dict. The Amazon MWAA environment can also be configured to use the Secrets Manager backend (AWS docs) which will then fetch the credentials from secret manager.

Cleaning up

To remove all the resources deployed, run terraform destroy and answer yes to approve execution of the plan to destroy the infrastructure.
This will take a while as the MWAA cluster can take around 20 minutes to be torn down. Once this completes, you should see a message indicating all resources destroyed.

 

Comments