How to Test and Deploy Airflow DAGs to Cloud Composer

By: 
 | 
August 14, 2020
Aug 11, 2022
 | 
Read time: 
5
 min
How to Test and Deploy Airflow DAGs to Cloud Composer

Apache Airflow is an open source software that allows developers to build data pipelines by writing Python scripts. These scripts, called directed acyclic graphs or DAGs, tell the Airflow engine a list of tasks to execute, the order in which to execute the tasks, and a schedule of how often each should run. Cloud Composer is a managed Airflow service from Google Cloud Platform (GCP) which runs on Kubernetes.

This article will cover how to integrate Cloud Build to automatically test and deploy Airflow data pipelines to Composer.

  1. Defining a DAG
  2. Testing a DAG
  3. Deploying DAGs to Composer
  4. Using Cloud Build to automate the testing and deployment process

1. Defining a DAG

The DAG we’ll be deploying takes advantage of PostgresToGoogleCloudStorageOperator. It applies SQL to the source data set stored under the table name loan_data. In our example below, the operator transfers the entire dataset to Cloud Storage along with adding a computed field.

DESTINATION_BUCKET = 'sc-flow-dev-data'
DESTINATION_DIRECTORY = "transferred"

dag_params = {
  'dag_id': 'PostgresToCloudStorageExample',
  'start_date': datetime(2020, 7, 7),
  'schedule_interval': timedelta(days=1),
}

with DAG(**dag_params) as dag:
  move_results = PostgresToGoogleCloudStorageOperator(       task_id="move_results",
      bucket=DESTINATION_BUCKET,
      filename=DESTINATION_DIRECTORY + "/{{ execution_date }}" + "/{}.json",
      sql='''SELECT *, due_date::date - effective_date::date as calc FROM loan_data;''',
      retries=3,
      postgres_conn_id="postgres_poc"
  )

Github Source: https://github.com/1904labs/example-airflow/blob/master/dags/loan_data.py

2. Testing a DAG

To enable Cloud Build to automatically test our DAG above, we’ll add a single validation test to our repository. Borrowing from this Medium post, this test will use DagBag to import DAGs, checking to make sure our DAG doesn’t contain a cycle.

import unittest
from airflow.models import DagBag

class TestDagIntegrity(unittest.TestCase):

   LOAD_SECOND_THRESHOLD = 2

   def setUp(self):
       self.dagbag = DagBag(dag_folder="dags", include_examples=False)

   def test_import_dags(self):
       self.assertFalse(
           len(self.dagbag.import_errors),
           'DAG import failures. Errors: {}'.format(
               self.dagbag.import_errors
           )
       )

suite = unittest.TestLoader().loadTestsFromTestCase(TestDagIntegrity)
unittest.TextTestRunner(verbosity=2).run(suite)

Source: https://github.com/1904labs/example-airflow/blob/master/test/dag_validation_test.py

If we want to test this locally, we can use Pytest from the command line. We will also use Pytest as part of an automated testing stage using Cloud Build.

3. Deploying a DAG to Composer

Cloud Composer is a managed Airflow service that runs on Kubernetes. Each Composer environment config holds reference to a Cloud Storage directory where it will retrieve DAGs from. This source DAG directory is created upon initialization of a Composer Environment. We can find the exact path to this directory by looking up the Configuration in Composer Environments -> Environment Configuration.

Any DAGs that we drop in the directory above will automatically be picked up and brought into our Composer environment.

Our above DAG is located in a '/dags' directory within a local git repository, and we can deploy it to Composer using gsutil to sync our local repository to Cloud Storage. 

gsutil -m rsync -d -r ./dags gs://us-central1-sc-flow-dev-com-e543240e-bucket/dags

Gsutil and rsync are syncing our local DAG repo to Cloud Storage. Composer automatically recognizes changes to the Cloud Storage repo, bringing in new or updated DAGs. Instead of deploying from our local computer, we can take this same command and turn it into an automated deploy stage using Cloud Build.

4. Using Cloud Build to Automate the Testing and Deployment Process

Let’s put all the pieces together to create a CI/CD pipeline for our Airflow DAGs. Using Cloud Build, we can integrate with Github and define our build, test, and deploy steps through a configuration file. 

We start in Cloud Build by configuring a trigger:

To set up a Cloud Build trigger, we will pick a name, select a repository to connect to, and provide a regex matching filter to specify the triggering branch. In our case, every branch triggers our pipeline.

Once the trigger is set up, we define our steps by adding a `cloudbuild.yaml` file to the project.

#testing1
steps:
- name: 'docker.io/library/python:3.7'
 id: Test
 entrypoint: /bin/sh
 args:
 - -c
 - 'pip install pytest && pip install -r requirements.txt && pytest test/*_test.py'
- name: gcr.io/google.com/cloudsdktool/cloud-sdk
 id: Deploy
 entrypoint: bash
 args: [ '-c', 'if [ "$BRANCH_NAME" == "master" ]; then echo "$BRANCH_NAME" && gsutil -m rsync -d -r ./dags gs://${_COMPOSER_BUCKET}/dags; else echo "Working on $BRANCH_NAME"; fi']
substitutions:
   _COMPOSER_BUCKET: us-central1-sc-flow-dev-com-e543240e-bucket

Source: https://github.com/1904labs/example-airflow/blob/master/cloudbuild.yaml

The above cloudbuild.yaml file contains both a test and deploy step. The test step uses pip to install our dependencies and pytest to run the test suite. Because our trigger occurs on push to any branch, the test step will run every time.

The deploy stage uses gsutil to copy DAGs from the git repository to our source destination. Our trigger is set to occur on push to any branch. However for some steps (such as deploy), we only want them to run on certain branches. Cloud Build only allows branches to be specified within the Cloud Build trigger, rather than within the `.yaml` file. Our wish (but is not currently supported by Cloud Build) would be to specify the branch in our deploy step:

- name: gcr.io/google.com/cloudsdktool/cloud-sdk id: Deploy branch: master

Two workarounds for the above limitation:

  1. For a single repository, add multiple triggers to Cloud Build. You could have one trigger for all branches, a separate trigger for development branch, and a third trigger for master branch. Each trigger references its own `.yaml` configuration file, and contains custom steps for that specific branch.
  2. Use a bash conditional determined by the branch name to determine whether to complete the step.

Where Do We Go From Here?

Continuous deployment of Airflow Data Pipelines to Composer allows data engineers to work locally, test out changes, and ensure improvements and bug fixes reach production. It emphasizes security by enabling the production environment to be restricted to manual changes. Taking the same concept in this blog post, we can build out a more advanced CI/CD process that pulls pipelines from multiple git repositories, versions the DAGs, verifies successful deployments by running systems tests on the production environment, and includes separate Composer dev and production environments.