Apache Airflow is an essential tool for data engineers. Data engineers use Airflow to automate, schedule and manage tasks within Directed Acyclic Graphs (DAGs) to ensure that data pipelines are being run effectively.
When pushing our DAGs to production, we, as data engineers, run the risk of pushing broken DAGs to production, breaking data pipelines and costing the data team and the whole organization time and money in the process. This is the reason why testing our DAGs before pushing them to production is key in maintaining robust and scalable data pipelines, while saving time and money for the organization.
Testing DAGs on Airflow before was a hard task as engineers had to create many DAGs on the production server to test their code. This takes up a lot of space and wastes a lot of time.
Why Test Airflow DAGs?
Testing Airflow DAGs is not just about catching bugs. It is about ensuring data integrity, reducing operational risk, and speeding up development. Without tests, a small change in your pipeline could have unintended consequences downstream ranging from data loss to duplicated reports and dashboards.
The following are reasons why testing DAGs is important in the data lifecycle:
- Prevent failures before they happen. This helps in identifying breaking points in our code even before pushing the DAG to production
- Test logic and dependencies without waiting for a full run
- Catch regressions and breaking changes early
- To save time and resources for the data team and whole organization in the long run.
Types of tests that can be done on Airflow DAGs.
There are various tests that can be conducted on Airflow DAGs which include:
- Unit tests using the
pytest
orunittest
modules from Python - Integration tests by setting up test DAGs and mock DAGs
- End-to-end tests by setting up test environments and conducting full DAG runs using the Airflow CLI or
breeze
.
1. Unit tests.
Unit tests are all about separating your logic from the DAG and test each task or logic independently.
This allows us to identify as many breaking points as possible in our code and fix them before pushing to production.
Example of a unit test in Airflow.
Assume we have a task in our DAG that calculates the average age of a user, and it takes in a list of user data as so:
from airflow.decorators import dag, task
default_args = {
...
}
@dag(dag_id='user_data_dag', default_args=default_args, schedule='@daily', start_date=datetime(2025, 6, 4))
def user_data_dag():
... # code and other tasks
@task
def calculate_avg_age(users):
if not users:
return 0
total_age = sum(user["age"] for user in users)
return total_age / len(users)
Let's run unit tests on this task, based on various logic.
import pytest
from dags import calculate_avg_age
def test_calculate_average_age_with_valid_users():
users = [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 40}
]
result = calculate_average_age(users)
assert result == 35
def test_calculate_average_age_with_empty_list():
users = []
result = calculate_average_age(users)
assert result == 0
def test_calculate_average_age_with_one_user():
users = [{"name": "Charlie", "age": 22}]
result = calculate_average_age(users)
assert result == 22
Pros
- Catches bugs early
- Broad angle of logic testing
- Faster feedback
- Encourages writing modular code
- Easier to debug
Cons
- Doesn't cover the entire scope of the project/DAG
- It is time consuming
- Doesn't uncover DAG related bugs and errors
- Provides a false sense of security as it doesn't guarantee that the DAG will run successfully in production
2. Integration tests.
While unit tests focus on isolated logic, integration tests verify how multiple components e.g. tasks interact with each other such as tasks passing data via Airflow XCom, API calls, or database operations.
These tests simulate a portion (or all) of your DAG logic to ensure everything works together as expected.
Example of an integration test in Airflow
Let's have two tasks extract()
which returns a list of dictionaries containing user data and transform()
which calculates the average age of users in the data set.
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule_interval=None, start_date=datetime(2025, 6, 4), catchup=False)
def integration_dag():
@task()
def extract():
return [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 40}]
@task()
def transform(users):
avg_age = sum([u["age"] for u in users]) / len(users)
print(f"Average age is: {avg_age}")
return avg_age
users = extract()
transform(users)
dag = integration_dag()
Here's an integration test for this DAG:
# tests/test_integration_dag.py
from airflow.models import DagBag, TaskInstance
from airflow.utils.state import State
from datetime import datetime
def test_taskflow_xcom_passes_correctly():
dag_bag = DagBag(dag_folder="dags/", include_examples=False)
dag = dag_bag.get_dag("integration_dag")
assert dag is not None
assert dag.tasks
extract_task = dag.get_task("extract")
transform_task = dag.get_task("transform")
exec_date = datetime(2023, 1, 1)
extract_ti = TaskInstance(task=extract_task, execution_date=exec_date)
extract_ti.run(ignore_ti_state=True)
# XCom value from extract
users_from_xcom = extract_ti.xcom_pull(task_ids="extract", key="return_value")
assert isinstance(users_from_xcom, list)
assert users_from_xcom[0]["name"] == "Alice"
transform_ti = TaskInstance(task=transform_task, execution_date=exec_date)
transform_ti.run(ignore_ti_state=True)
assert transform_ti.state == State.SUCCESS
Pros
- Validates real behaviour during DAG runs
- More realistic scope than unit tests
- Useful in test environments
Cons
- Harder to maintain
- Complex setup
- Slower in execution
3. End-to-end tests.
End-to-end (E2E) testing is the practice of executing an entire DAG or a significant portion of it in an environment that mimics a production environment. It tests not just your task logic, but also task dependencies, retries, data movement, and overall orchestration.
It simulates a DAG run using real data or test data and ensures the entire flow behaves as expected from task execution to XComs, file movement, database writes, or API calls.
Pros
- Tests the full workflow
- It mimics the production environment, hence exhibiting behaviour as it would in a production environment
- Uncovers integration issues
Cons
- Slow to run
- Harder to set up and debug
4. Testing using airflow dags test
The airflow dags test command allows you to manually run a DAG from the command line for a given execution date — task by task without waiting for a scheduler or triggering a full DAG run in the database.
It simulates a DAG run without scheduling, backfilling, or logging metadata to the database.
To use this command in the CLI:
airflow dags test sample_etl_dag 2023-01-01
# Syntax: airflow dags test <dag_id> <execution_date>
NOTE: This only runs locally so it doesn't affect any external sources e.g. APIs
5. Using dag.test()
method.
We can use dag.test()
method at the end of our code to test our code when run in the terminal.
It is handy for local development testing, as our DAG can be automatically tested when the file is run.
Example.
from airflow.decorators import dag, task
from datetime import datetime, timedelta
default_args = {
'owner': 'deecodes',
'retries': 5,
'retry_delay': timedelta(minutes=1)
}
@dag(dag_id='test_dag', default_args=default_args, start_date=datetime(2025, 5, 30), schedule='@hourly', catchup=False)
def test_dag():
@task
def say_hello():
print("Hello there, Airflow is running!")
say_hello()
dag_test = test_dag()
# let's test this DAG
if __name__ == '__main__':
dag_test.test()
Then, run our file in the terminal to test.
python3 test_dag.py
Our result
[2025-06-06T18:51:51.238+0300] {dag.py:4435} INFO - dagrun id: test_dag
[2025-06-06T18:51:51.278+0300] {dag.py:4451} INFO - created dagrun <DagRun test_dag @ 2025-06-06 15:51:51.044158+00:00: manual__2025-06-06T15:51:51.044158+00:00, state:running, queued_at: None. externally triggered: False>
[2025-06-06T18:51:51.349+0300] {dag.py:4396} INFO - [DAG TEST] starting task_id=say_hello map_index=-1
[2025-06-06T18:51:51.350+0300] {dag.py:4399} INFO - [DAG TEST] running task <TaskInstance: test_dag.say_hello manual__2025-06-06T15:51:51.044158+00:00 [scheduled]>
[2025-06-06 18:51:53,606] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='deecodes' AIRFLOW_CTX_DAG_ID='test_dag' AIRFLOW_CTX_TASK_ID='say_hello' AIRFLOW_CTX_EXECUTION_DATE='2025-06-06T15:51:51.044158+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-06-06T15:51:51.044158+00:00'
[2025-06-06T18:51:53.606+0300] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='deecodes' AIRFLOW_CTX_DAG_ID='test_dag' AIRFLOW_CTX_TASK_ID='say_hello' AIRFLOW_CTX_EXECUTION_DATE='2025-06-06T15:51:51.044158+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-06-06T15:51:51.044158+00:00'
Task instance is in running state
Previous state of the Task instance: queued
Current task name:say_hello state:scheduled start_date:None
Dag name:test_dag and current dag run status:running
[2025-06-06T18:51:53.611+0300] {taskinstance.py:732} INFO - ::endgroup::
Hello there, Airflow is running!
[2025-06-06 18:51:53,614] {python.py:240} INFO - Done. Returned value was: None
[2025-06-06T18:51:53.614+0300] {python.py:240} INFO - Done. Returned value was: None
[2025-06-06T18:51:53.624+0300] {taskinstance.py:341} INFO - ::group::Post task execution logs
[2025-06-06T18:51:53.625+0300] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=test_dag, task_id=say_hello, run_id=manual__2025-06-06T15:51:51.044158+00:00, execution_date=20250606T155151, start_date=, end_date=20250606T155153
Task instance in success state
Previous state of the Task instance: running
Dag name:test_dag queued_at:None
Task hostname:Denzo.localdomain operator:_PythonDecoratedOperator
[2025-06-06T18:51:53.696+0300] {dag.py:4410} INFO - [DAG TEST] end task task_id=say_hello map_index=-1
[2025-06-06T18:51:53.707+0300] {dagrun.py:854} INFO - Marking run <DagRun test_dag @ 2025-06-06 15:51:51.044158+00:00: manual__2025-06-06T15:51:51.044158+00:00, state:running, queued_at: None. externally triggered: False> successful
Dag run in success state
Dag run start:2025-06-06 15:51:51.044158+00:00 end:2025-06-06 15:51:53.708913+00:00
[2025-06-06T18:51:53.710+0300] {dagrun.py:905} INFO - DagRun Finished: dag_id=test_dag, execution_date=2025-06-06 15:51:51.044158+00:00, run_id=manual__2025-06-06T15:51:51.044158+00:00, run_start_date=2025-06-06 15:51:51.044158+00:00, run_end_date=2025-06-06 15:51:53.708913+00:00, run_duration=2.664755, state=success, external_trigger=False, run_type=manual, data_interval_start=2025-06-06 14:00:00+00:00, data_interval_end=2025-06-06 15:00:00+00:00, dag_hash=None
Pros
- Allows for faster local testing
- Simple debugging
- No Airflow environment required to run
Cons
- This method is not scalable
- It doesn't test DAG's integrity
- It doesn't simulate a true DAG run.
Final thoughts
Testing your Airflow DAGs may feel like overkill when you are moving fast, but the long-term stability and confidence it brings are well worth the effort. Treat your data pipelines like production code, because they are.
By following the practical steps in this guide, you and your team will be better equipped to deliver robust, scalable pipelines that do not break when it matters most, during production.
To learn more about testing in Airflow, visit the following links to learn more:
- Airflow's testing documentation
- Astronomer's Testing documentation
- Airflow's best practices in building DAGs
- etsy-dagtest, on Apache Airflow's YouTube channel
I hope this post has enriched your knowledge in testing your Airflow DAGs. Please leave a like, comment, and follow for more data engineering posts just like this one.
Do you have any other methods that you use to test your Airflow DAGs before pushing them to production? Let me know down in the comments!
For any comments or communication, please email me at denzelkinyua11@gmail.com to get in touch.