A journey to faster feedback on Airflow

Discovery of Airflow’s test pyramid

Karun Japhet | @javatarz

Who am I?

Background for context 😄

Solution consultant at Sahaj Logo

(Former) JVM Jedi

Jedi Master

I am was not

Safety nets are essential

What is

A short introduction

A batch workflow orchestration platform

  • Create, monitor and schedule workflows as DAGs
  • Python 🐍 for defining workflows as code

DAG as code

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:

    # Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")

    @task()
    def airflow():
        print("airflow")

    # Set dependencies between tasks
    hello >> airflow()

Workflow as a DAG

Monitoring a DAG

Our usage of airflow

TODO: insert image explaining the relationship between Airflow as an orchestrator managing our data pipelines and parts of our data lake.

The journey

Recapping the learnings from the past couple of years in this space

Nightmare of no safety nets

Splitting the codebase

  • ⚠️ Writing automated tests for DAGs directly was hard
  • ⌨️ We split the “logic” out into utilities
  • 🟢 Strong unit tests for utilities
  • 🟡 Airflow DAGs, operations and any related functionality didn’t have tests at this stage

Plague of misconfiguration

  • Accidentally ran child DAGs “before” parents
  • Delays the entire process by over a week
  • Order of tasks was accidentally broken at times and we didn’t realize it

Automating airflow tests

Manually created database

def pytest_configure() -> None:  
    target = './unittests.db'  
    template = './unittests.db.template'  
    if not Path(target).is_file():  
        shutil.copy(template, target)

Test task dependency order

Verify cross task communication data

TODO: code sample testing XCom

Render Jinja templates in tasks and test them

What didn’t work

⚠️ Tests were slow due to DagBag loading

  • Every test loads the DagBag again.
  • Every DagBag instance scans for DAGs and loads them
  • This takes 2-3 seconds

⚠️ Large number tests existed in the repo

  • Over 1000 tests in a repository took 3-5 minutes to run
  • Start using pytest-xdist for parallelism
  • Shared DagBag state breaks tests
  • 🟢 Create cache burst policy (dag_bag_instance_key)
The solution
@lru_cache()
def dag_bag(dag_bag_instance_key: Optional[str] = None) -> DagBag:  
    path_to_read_dags = './dags'  
    logging.debug(f'Fetching DagBag for config key {dag_bag_instance_key} from path {path_to_read_dags}')  
  
    return DagBag(  
        dag_folder=path_to_read_dags,  
        include_examples=False,  
        store_serialized_dags=False  
    )

⚠️ Airflow UI is slow / S3 cost is high

with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:
	# DAG code here

# this code is run during DAG load
import boto3

bucket = boto3.Session().resource('s3') \
    .Bucket('sample-bucket')

for bucket_object in bucket.objects.all():
    print(bucket_object.key)
The solution
  1. identify slow tests
  2. trace down to DAG load
  3. identify slower DAGs
  4. 🟢 delay the function execution to the DAG run phase using Jinja

The result

Over 2000 tests in less than a minute!

Airflow upgrade to v1.10.15

  • DB schemas change and all hell breaks loose
  • Automating this required understanding how Airflow runs migrations
def pytest_configure() -> None:  
    target = './unittests.db'  
    if not Path(target).is_file():  
        db.resetdb(rbac=True)

Airflow upgrade to v2.0.0

The “ports” hack

TODO: add code

Airflow’s internal DB structure changes

TODO: add code

Relative paths for DBs don’t work

Annoying, but can be addressed TODO: add code

Disable parallel tests

TODO: add code explanation 😱

Current state

Single migration + Multiple DBs = Parallel test bliss

TODO: add code

Future state

Auto-detect test groups by system state

TODO: add code to show why this will help

Thank you!

Questions?

Karun Japhet | @javatarz Sahaj Logo