Learn to implement production-grade testing for Apache Airflow DAGs using pytest, including unit tests for DAG structure validation, integration testing with test databases, and automated validation pipelines with CI/CD integration.
Prerequisites
- Apache Airflow 2.8+
- Python 3.9+
- PostgreSQL (optional)
- Git for version control
What this solves
Testing Apache Airflow DAGs is critical for production deployments to prevent runtime failures, validate task dependencies, and ensure data pipeline reliability. This tutorial shows you how to implement comprehensive testing strategies using pytest, including DAG structure validation, task execution testing, and automated validation pipelines that integrate with CI/CD workflows.
Step-by-step configuration
Update system packages and install prerequisites
Start by updating your system and installing Python development tools required for testing framework setup.
sudo apt update && sudo apt upgrade -y
sudo apt install -y python3-pip python3-venv python3-dev build-essential
Create Airflow testing environment
Set up a dedicated Python virtual environment for Airflow testing with isolated dependencies.
mkdir -p /opt/airflow-testing
cd /opt/airflow-testing
python3 -m venv airflow-test-env
source airflow-test-env/bin/activate
export AIRFLOW_HOME=/opt/airflow-testing
Install Airflow and testing dependencies
Install Apache Airflow with testing constraints and pytest framework with essential plugins for DAG testing.
pip install --upgrade pip
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.11.txt"
pip install "apache-airflow[postgres,celery]==2.8.1" --constraint "${CONSTRAINT_URL}"
pip install pytest pytest-airflow pytest-mock pytest-cov freezegun
pip install sqlalchemy-utils factory-boy
Initialize Airflow test database
Configure Airflow with SQLite for testing and initialize the metadata database schema.
airflow db init
airflow users create \
--username admin \
--firstname Test \
--lastname Admin \
--role Admin \
--email admin@example.com \
--password testpassword123
Create DAG testing directory structure
Establish a standardized directory structure for organizing DAG files, tests, and configuration.
mkdir -p dags tests/unit tests/integration tests/fixtures
mkdir -p tests/conftest tests/utils
touch tests/__init__.py tests/unit/__init__.py tests/integration/__init__.py
Configure pytest settings
Create pytest configuration with Airflow-specific settings and coverage reporting.
[tool:pytest]
addopts =
--verbose
--strict-markers
--strict-config
--cov=dags
--cov-report=html
--cov-report=term-missing
--cov-fail-under=80
testpaths = tests
markers =
unit: Unit tests
integration: Integration tests
slow: Slow running tests
dag_validation: DAG validation tests
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning
Create test configuration and fixtures
Set up pytest fixtures for Airflow testing with database isolation and DAG loading utilities.
import pytest
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import DagBag, TaskInstance
from airflow.utils.db import create_session
from airflow.configuration import conf
from sqlalchemy_utils import database_exists, create_database, drop_database
@pytest.fixture(scope="session")
def airflow_db():
"""Create and tear down test database."""
db_url = conf.get('database', 'sql_alchemy_conn')
if database_exists(db_url):
drop_database(db_url)
create_database(db_url)
os.system('airflow db init')
yield
if database_exists(db_url):
drop_database(db_url)
@pytest.fixture
def dagbag():
"""Return DagBag instance for testing."""
return DagBag(dag_folder='dags/', include_examples=False)
@pytest.fixture
def sample_dag():
"""Create a sample DAG for testing."""
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'test_dag',
default_args=default_args,
description='Test DAG for validation',
schedule_interval=timedelta(days=1),
catchup=False
)
return dag
Create sample DAG for testing
Implement a sample DAG with multiple tasks and dependencies to demonstrate testing strategies.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
def extract_data(**context):
"""Extract data from source."""
print(f"Extracting data for {context['execution_date']}")
return "extracted_data"
def transform_data(**context):
"""Transform extracted data."""
extracted = context['task_instance'].xcom_pull(task_ids='extract_task')
print(f"Transforming data: {extracted}")
return "transformed_data"
def load_data(**context):
"""Load transformed data."""
transformed = context['task_instance'].xcom_pull(task_ids='transform_task')
print(f"Loading data: {transformed}")
return "load_complete"
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email': ['admin@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'sample_etl_pipeline',
default_args=default_args,
description='Sample ETL pipeline for testing',
schedule_interval='@daily',
max_active_runs=1,
catchup=False,
tags=['etl', 'sample', 'testing']
)
validate_input = BashOperator(
task_id='validate_input',
bash_command='echo "Validating input data"',
dag=dag
)
extract_task = PythonOperator(
task_id='extract_task',
python_callable=extract_data,
dag=dag
)
transform_task = PythonOperator(
task_id='transform_task',
python_callable=transform_data,
dag=dag
)
load_task = PythonOperator(
task_id='load_task',
python_callable=load_data,
dag=dag
)
notify_success = EmailOperator(
task_id='notify_success',
to=['admin@example.com'],
subject='ETL Pipeline Success',
html_content='ETL pipeline completed successfully
',
dag=dag
)
Task dependencies
validate_input >> extract_task >> transform_task >> load_task >> notify_success
Implement DAG structure validation tests
Create unit tests to validate DAG structure, task configuration, and dependency relationships.
import pytest
from datetime import datetime, timedelta
from airflow.models import DagBag
from airflow.utils.dag_cycle import check_cycle
class TestDAGValidation:
"""Test DAG structure and configuration."""
def test_dag_loaded(self, dagbag):
"""Test that DAG is loaded without errors."""
dag = dagbag.get_dag('sample_etl_pipeline')
assert dag is not None
assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"
def test_dag_has_tags(self, dagbag):
"""Test that DAG has required tags."""
dag = dagbag.get_dag('sample_etl_pipeline')
assert 'etl' in dag.tags
assert 'testing' in dag.tags
def test_dag_has_required_config(self, dagbag):
"""Test DAG has proper configuration."""
dag = dagbag.get_dag('sample_etl_pipeline')
assert dag.schedule_interval == '@daily'
assert dag.max_active_runs == 1
assert dag.catchup is False
assert dag.default_args['retries'] == 2
assert dag.default_args['retry_delay'] == timedelta(minutes=5)
def test_dag_has_description(self, dagbag):
"""Test DAG has meaningful description."""
dag = dagbag.get_dag('sample_etl_pipeline')
assert dag.description is not None
assert len(dag.description) > 10
def test_dag_start_date(self, dagbag):
"""Test DAG start date is properly set."""
dag = dagbag.get_dag('sample_etl_pipeline')
assert dag.start_date == datetime(2024, 1, 1)
def test_dag_task_count(self, dagbag):
"""Test DAG has expected number of tasks."""
dag = dagbag.get_dag('sample_etl_pipeline')
assert len(dag.tasks) == 5
def test_dag_task_names(self, dagbag):
"""Test DAG contains expected tasks."""
dag = dagbag.get_dag('sample_etl_pipeline')
expected_tasks = {
'validate_input', 'extract_task', 'transform_task',
'load_task', 'notify_success'
}
actual_tasks = {task.task_id for task in dag.tasks}
assert expected_tasks == actual_tasks
def test_dag_dependencies(self, dagbag):
"""Test DAG task dependencies are correct."""
dag = dagbag.get_dag('sample_etl_pipeline')
validate_task = dag.get_task('validate_input')
extract_task = dag.get_task('extract_task')
transform_task = dag.get_task('transform_task')
load_task = dag.get_task('load_task')
notify_task = dag.get_task('notify_success')
# Test downstream dependencies
assert extract_task in validate_task.downstream_list
assert transform_task in extract_task.downstream_list
assert load_task in transform_task.downstream_list
assert notify_task in load_task.downstream_list
# Test upstream dependencies
assert validate_task in extract_task.upstream_list
assert extract_task in transform_task.upstream_list
assert transform_task in load_task.upstream_list
assert load_task in notify_task.upstream_list
def test_dag_no_cycles(self, dagbag):
"""Test DAG has no circular dependencies."""
dag = dagbag.get_dag('sample_etl_pipeline')
check_cycle(dag)
def test_task_retries_configured(self, dagbag):
"""Test tasks have retry configuration."""
dag = dagbag.get_dag('sample_etl_pipeline')
for task in dag.tasks:
assert task.retries >= 1
assert task.retry_delay is not None
Create task execution unit tests
Implement unit tests for individual task logic and Python callable functions.
import pytest
from unittest.mock import Mock, patch
from dags.sample_etl_dag import extract_data, transform_data, load_data
class TestTaskFunctions:
"""Test individual task functions."""
def test_extract_data_function(self):
"""Test extract_data function returns expected value."""
context = {'execution_date': '2024-01-01'}
result = extract_data(**context)
assert result == "extracted_data"
@patch('dags.sample_etl_dag.print')
def test_extract_data_logs_execution_date(self, mock_print):
"""Test extract_data logs execution date."""
context = {'execution_date': '2024-01-01'}
extract_data(**context)
mock_print.assert_called_once_with("Extracting data for 2024-01-01")
def test_transform_data_function(self):
"""Test transform_data function with mocked XCom."""
mock_ti = Mock()
mock_ti.xcom_pull.return_value = "extracted_data"
context = {'task_instance': mock_ti}
result = transform_data(**context)
assert result == "transformed_data"
mock_ti.xcom_pull.assert_called_once_with(task_ids='extract_task')
def test_load_data_function(self):
"""Test load_data function with mocked XCom."""
mock_ti = Mock()
mock_ti.xcom_pull.return_value = "transformed_data"
context = {'task_instance': mock_ti}
result = load_data(**context)
assert result == "load_complete"
mock_ti.xcom_pull.assert_called_once_with(task_ids='transform_task')
def test_extract_data_with_different_dates(self):
"""Test extract_data handles different execution dates."""
dates = ['2024-01-01', '2024-01-02', '2024-12-31']
for date in dates:
context = {'execution_date': date}
result = extract_data(**context)
assert result == "extracted_data"
def test_transform_data_handles_none_input(self):
"""Test transform_data handles None input gracefully."""
mock_ti = Mock()
mock_ti.xcom_pull.return_value = None
context = {'task_instance': mock_ti}
# Should not raise exception
result = transform_data(**context)
assert result == "transformed_data"
Implement integration tests
Create integration tests that validate DAG execution with actual Airflow task instances.
import pytest
from datetime import datetime, timedelta
from airflow.models import DagBag, TaskInstance
from airflow.utils.state import State
from airflow.utils.types import DagRunType
from airflow.utils.db import create_session
from freezegun import freeze_time
@pytest.mark.integration
class TestDAGExecution:
"""Integration tests for DAG execution."""
def test_dag_run_success(self, dagbag, airflow_db):
"""Test complete DAG run executes successfully."""
dag = dagbag.get_dag('sample_etl_pipeline')
execution_date = datetime(2024, 1, 1)
# Create DAG run
dag_run = dag.create_dagrun(
run_id=f'test_run_{execution_date}',
state=State.RUNNING,
execution_date=execution_date,
start_date=datetime.now(),
run_type=DagRunType.MANUAL
)
# Execute tasks in order
task_instances = []
for task in dag.topological_sort():
ti = TaskInstance(task, execution_date=execution_date)
ti.dag_run = dag_run
task_instances.append(ti)
# Validate task instances created
assert len(task_instances) == 5
assert dag_run.state == State.RUNNING
def test_task_dependencies_execution_order(self, dagbag, airflow_db):
"""Test tasks execute in correct dependency order."""
dag = dagbag.get_dag('sample_etl_pipeline')
execution_date = datetime(2024, 1, 1)
# Get topologically sorted tasks
sorted_tasks = dag.topological_sort()
task_ids = [task.task_id for task in sorted_tasks]
# Verify execution order
assert task_ids.index('validate_input') < task_ids.index('extract_task')
assert task_ids.index('extract_task') < task_ids.index('transform_task')
assert task_ids.index('transform_task') < task_ids.index('load_task')
assert task_ids.index('load_task') < task_ids.index('notify_success')
def test_task_xcom_data_flow(self, dagbag, airflow_db):
"""Test XCom data flows correctly between tasks."""
dag = dagbag.get_dag('sample_etl_pipeline')
execution_date = datetime(2024, 1, 1)
# Create and run extract task
extract_task = dag.get_task('extract_task')
ti = TaskInstance(extract_task, execution_date=execution_date)
with create_session() as session:
ti.run(session=session)
# Check XCom value was pushed
xcom_value = ti.xcom_pull(task_ids='extract_task')
assert xcom_value == "extracted_data"
@freeze_time("2024-01-01 12:00:00")
def test_dag_scheduling(self, dagbag):
"""Test DAG scheduling configuration."""
dag = dagbag.get_dag('sample_etl_pipeline')
# Test schedule interval
assert dag.schedule_interval == '@daily'
# Test next execution time
next_run = dag.next_dagrun_info(None)
assert next_run is not None
def test_task_retry_configuration(self, dagbag):
"""Test task retry settings are properly configured."""
dag = dagbag.get_dag('sample_etl_pipeline')
for task in dag.tasks:
assert task.retries == 2
assert task.retry_delay == timedelta(minutes=5)
def test_dag_max_active_runs(self, dagbag):
"""Test DAG respects max_active_runs setting."""
dag = dagbag.get_dag('sample_etl_pipeline')
assert dag.max_active_runs == 1
Create test utilities and helpers
Implement utility functions for common testing patterns and mock data generation.
from datetime import datetime, timedelta
from airflow.models import TaskInstance, DagRun
from airflow.utils.state import State
from airflow.utils.types import DagRunType
class DAGTestHelper:
"""Helper class for DAG testing utilities."""
@staticmethod
def create_dag_run(dag, execution_date=None, state=State.RUNNING):
"""Create a DAG run for testing."""
if execution_date is None:
execution_date = datetime(2024, 1, 1)
return dag.create_dagrun(
run_id=f'test_run_{execution_date}',
state=state,
execution_date=execution_date,
start_date=datetime.now(),
run_type=DagRunType.MANUAL
)
@staticmethod
def create_task_instance(task, execution_date=None, state=State.NONE):
"""Create a task instance for testing."""
if execution_date is None:
execution_date = datetime(2024, 1, 1)
ti = TaskInstance(task, execution_date=execution_date)
ti.state = state
return ti
@staticmethod
def validate_dag_structure(dag, expected_task_count, expected_tasks):
"""Validate basic DAG structure."""
assert len(dag.tasks) == expected_task_count
actual_tasks = {task.task_id for task in dag.tasks}
assert set(expected_tasks) == actual_tasks
return True
@staticmethod
def validate_task_dependencies(dag, dependency_map):
"""Validate task dependencies match expected structure."""
for task_id, expected_downstream in dependency_map.items():
task = dag.get_task(task_id)
actual_downstream = {t.task_id for t in task.downstream_list}
assert set(expected_downstream) == actual_downstream
return True
@staticmethod
def get_execution_dates_range(start_date, end_date, schedule_interval):
"""Generate execution dates for testing."""
dates = []
current = start_date
while current <= end_date:
dates.append(current)
if schedule_interval == '@daily':
current += timedelta(days=1)
elif schedule_interval == '@hourly':
current += timedelta(hours=1)
else:
break
return dates
Set up pre-commit hooks for DAG validation
Configure pre-commit hooks to automatically validate DAGs before code commits.
pip install pre-commit
touch .pre-commit-config.yaml
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-merge-conflict
- repo: https://github.com/psf/black
rev: 23.7.0
hooks:
- id: black
args: [--line-length=88]
files: ^(dags|tests)/.*\.py$
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
args: [--max-line-length=88, --ignore=E203,W503]
files: ^(dags|tests)/.*\.py$
- repo: local
hooks:
- id: airflow-dag-validation
name: Validate Airflow DAGs
entry: python -c "from airflow.models import DagBag; db = DagBag(); assert len(db.import_errors) == 0, f'DAG errors: {db.import_errors}'"
language: python
files: ^dags/.*\.py$
- id: pytest-dag-tests
name: Run DAG tests
entry: pytest tests/unit/test_dag_validation.py -v
language: python
pass_filenames: false
files: ^(dags|tests)/.*\.py$
pre-commit install
pre-commit run --all-files
Create CI/CD pipeline configuration
Set up GitHub Actions workflow for automated DAG testing and validation.
name: Airflow DAG Tests
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.9, 3.10, 3.11]
services:
postgres:
image: postgres:13
env:
POSTGRES_PASSWORD: airflow
POSTGRES_USER: airflow
POSTGRES_DB: airflow
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Cache pip dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
- name: Install dependencies
run: |
python -m pip install --upgrade pip
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.11.txt"
pip install "apache-airflow[postgres,celery]==2.8.1" --constraint "${CONSTRAINT_URL}"
pip install pytest pytest-airflow pytest-mock pytest-cov freezegun
pip install sqlalchemy-utils factory-boy
- name: Set up Airflow
env:
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@localhost:5432/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: False
AIRFLOW__CORE__DAGS_FOLDER: ./dags
run: |
airflow db init
- name: Run DAG validation tests
env:
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@localhost:5432/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: False
AIRFLOW__CORE__DAGS_FOLDER: ./dags
run: |
pytest tests/unit/ -v --cov=dags --cov-report=xml
- name: Run integration tests
env:
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@localhost:5432/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: False
AIRFLOW__CORE__DAGS_FOLDER: ./dags
run: |
pytest tests/integration/ -v -m integration
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
fail_ci_if_error: true
Create test execution script
Implement a comprehensive test runner script for local development and CI/CD environments.
#!/bin/bash
set -e
echo "Starting Airflow DAG Testing Suite"
echo "===================================="
Activate virtual environment if it exists
if [ -f "airflow-test-env/bin/activate" ]; then
source airflow-test-env/bin/activate
echo "Activated virtual environment"
fi
Set Airflow environment variables
export AIRFLOW_HOME=/opt/airflow-testing
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=./dags
export AIRFLOW__CORE__UNIT_TEST_MODE=True
Initialize database if needed
if [ ! -f "$AIRFLOW_HOME/airflow.db" ]; then
echo "Initializing Airflow database..."
airflow db init
fi
Run DAG import validation
echo "Validating DAG imports..."
python -c "from airflow.models import DagBag; db = DagBag(); print(f'Loaded {len(db.dags)} DAGs'); assert len(db.import_errors) == 0, f'Import errors: {db.import_errors}'"
Run unit tests
echo "Running unit tests..."
pytest tests/unit/ -v --cov=dags --cov-report=term-missing --cov-report=html
Run integration tests
echo "Running integration tests..."
pytest tests/integration/ -v -m integration
Run all tests with coverage
echo "Running full test suite..."
pytest tests/ -v --cov=dags --cov-report=html --cov-fail-under=80
echo "All tests completed successfully!"
echo "Coverage report generated in htmlcov/index.html"
chmod +x run_tests.sh
Verify your setup
Test your DAG validation framework to ensure all components work correctly.
# Run the complete test suite
./run_tests.sh
Check DAG import validation
python -c "from airflow.models import DagBag; db = DagBag(); print(f'DAGs loaded: {len(db.dags)}'); print(f'Import errors: {len(db.import_errors)}')"
Run specific test categories
pytest tests/unit/test_dag_validation.py -v
pytest tests/integration/ -v -m integration
Check test coverage
pytest tests/ --cov=dags --cov-report=term-missing
Validate pre-commit hooks
pre-commit run --all-files
Your comprehensive Airflow DAG testing framework is now configured. You can integrate this with your existing Airflow monitoring setup and leverage it alongside Kubernetes-based Airflow deployments.
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| ImportError: No module named 'airflow' | Virtual environment not activated | source airflow-test-env/bin/activate |
| Database connection errors | AIRFLOW_HOME not set correctly | export AIRFLOW_HOME=/opt/airflow-testing |
| DAG import errors | Missing dependencies in DAG files | Check dagbag.import_errors for details |
| Test database conflicts | Parallel test execution issues | Use pytest -x --forked for isolation |
| XCom pull failures in tests | Task instance not properly mocked | Mock TaskInstance with return_value set |
| Pre-commit hooks failing | Code style violations | Run black dags/ tests/ to format code |
| Coverage below threshold | Insufficient test coverage | Add tests for uncovered functions/branches |
| Integration tests timeout | Database operations too slow | Use SQLite for faster testing or optimize queries |
Next steps
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Global variables
AIRFLOW_HOME="/opt/airflow-testing"
AIRFLOW_USER="airflow"
PYTHON_VERSION="3.11"
# Usage function
usage() {
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " -h, --help Show this help message"
echo " -u, --user USER Airflow user (default: airflow)"
echo " -d, --dir DIR Airflow home directory (default: /opt/airflow-testing)"
exit 1
}
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
-h|--help)
usage
;;
-u|--user)
AIRFLOW_USER="$2"
shift 2
;;
-d|--dir)
AIRFLOW_HOME="$2"
shift 2
;;
*)
echo -e "${RED}Error: Unknown option $1${NC}"
usage
;;
esac
done
# Error cleanup function
cleanup() {
echo -e "${RED}Installation failed. Cleaning up...${NC}"
if id "$AIRFLOW_USER" &>/dev/null; then
userdel -r "$AIRFLOW_USER" 2>/dev/null || true
fi
rm -rf "$AIRFLOW_HOME" 2>/dev/null || true
}
trap cleanup ERR
# Check if running as root
if [[ $EUID -ne 0 ]]; then
echo -e "${RED}Error: This script must be run as root${NC}"
exit 1
fi
# Detect distribution
echo -e "${YELLOW}[1/12] Detecting distribution...${NC}"
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update -y"
PKG_INSTALL="apt install -y"
PKG_UPGRADE="apt upgrade -y"
PYTHON_DEV="python3-dev"
BUILD_TOOLS="build-essential"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
PKG_UPGRADE="dnf upgrade -y"
PYTHON_DEV="python3-devel"
BUILD_TOOLS="gcc gcc-c++ make"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
PKG_UPGRADE="yum upgrade -y"
PYTHON_DEV="python3-devel"
BUILD_TOOLS="gcc gcc-c++ make"
;;
*)
echo -e "${RED}Error: Unsupported distribution: $ID${NC}"
exit 1
;;
esac
else
echo -e "${RED}Error: Cannot detect distribution${NC}"
exit 1
fi
echo -e "${GREEN}Detected: $PRETTY_NAME${NC}"
# Update system packages
echo -e "${YELLOW}[2/12] Updating system packages...${NC}"
$PKG_UPDATE
$PKG_UPGRADE
# Install prerequisites
echo -e "${YELLOW}[3/12] Installing prerequisites...${NC}"
$PKG_INSTALL python3-pip python3-venv $PYTHON_DEV $BUILD_TOOLS postgresql-client
# Create airflow user
echo -e "${YELLOW}[4/12] Creating airflow user...${NC}"
if ! id "$AIRFLOW_USER" &>/dev/null; then
useradd -r -m -s /bin/bash "$AIRFLOW_USER"
fi
# Create Airflow testing environment
echo -e "${YELLOW}[5/12] Creating Airflow testing environment...${NC}"
mkdir -p "$AIRFLOW_HOME"
chown "$AIRFLOW_USER:$AIRFLOW_USER" "$AIRFLOW_HOME"
chmod 755 "$AIRFLOW_HOME"
# Set up Python virtual environment
echo -e "${YELLOW}[6/12] Setting up Python virtual environment...${NC}"
sudo -u "$AIRFLOW_USER" python3 -m venv "$AIRFLOW_HOME/airflow-test-env"
# Install Airflow and testing dependencies
echo -e "${YELLOW}[7/12] Installing Airflow and testing dependencies...${NC}"
sudo -u "$AIRFLOW_USER" bash -c "
source $AIRFLOW_HOME/airflow-test-env/bin/activate
pip install --upgrade pip
CONSTRAINT_URL=\"https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.11.txt\"
pip install \"apache-airflow[postgres,celery]==2.8.1\" --constraint \"\${CONSTRAINT_URL}\"
pip install pytest pytest-airflow pytest-mock pytest-cov freezegun sqlalchemy-utils factory-boy
"
# Set environment variables
echo -e "${YELLOW}[8/12] Configuring environment variables...${NC}"
sudo -u "$AIRFLOW_USER" bash -c "
echo 'export AIRFLOW_HOME=$AIRFLOW_HOME' >> $AIRFLOW_HOME/.bashrc
echo 'source $AIRFLOW_HOME/airflow-test-env/bin/activate' >> $AIRFLOW_HOME/.bashrc
"
# Initialize Airflow test database
echo -e "${YELLOW}[9/12] Initializing Airflow test database...${NC}"
sudo -u "$AIRFLOW_USER" bash -c "
export AIRFLOW_HOME=$AIRFLOW_HOME
source $AIRFLOW_HOME/airflow-test-env/bin/activate
airflow db init
airflow users create \
--username admin \
--firstname Test \
--lastname Admin \
--role Admin \
--email admin@example.com \
--password testpassword123
"
# Create DAG testing directory structure
echo -e "${YELLOW}[10/12] Creating DAG testing directory structure...${NC}"
sudo -u "$AIRFLOW_USER" bash -c "
cd $AIRFLOW_HOME
mkdir -p dags tests/unit tests/integration tests/fixtures tests/conftest tests/utils
touch tests/__init__.py tests/unit/__init__.py tests/integration/__init__.py
"
# Create pytest configuration
echo -e "${YELLOW}[11/12] Creating pytest configuration...${NC}"
cat > "$AIRFLOW_HOME/pytest.ini" << 'EOF'
[tool:pytest]
addopts =
--verbose
--strict-markers
--strict-config
--cov=dags
--cov-report=html
--cov-report=term-missing
--cov-fail-under=80
testpaths = tests
markers =
unit: Unit tests
integration: Integration tests
slow: Slow running tests
dag_validation: DAG validation tests
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning
EOF
# Create test configuration and fixtures
cat > "$AIRFLOW_HOME/tests/conftest.py" << 'EOF'
import pytest
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import DagBag, TaskInstance
from airflow.utils.db import create_session
from airflow.configuration import conf
@pytest.fixture(scope="session")
def airflow_db():
"""Create and tear down test database."""
yield
@pytest.fixture
def dagbag():
"""Return DagBag instance for testing."""
return DagBag(dag_folder='dags/', include_examples=False)
@pytest.fixture
def sample_dag():
"""Create a sample DAG for testing."""
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'test_dag',
default_args=default_args,
description='Test DAG for pytest',
schedule_interval=timedelta(days=1),
catchup=False,
tags=['test']
)
return dag
EOF
# Set proper permissions
chown -R "$AIRFLOW_USER:$AIRFLOW_USER" "$AIRFLOW_HOME"
find "$AIRFLOW_HOME" -type d -exec chmod 755 {} \;
find "$AIRFLOW_HOME" -type f -exec chmod 644 {} \;
chmod 755 "$AIRFLOW_HOME/airflow-test-env/bin/activate"
# Verification
echo -e "${YELLOW}[12/12] Verifying installation...${NC}"
if sudo -u "$AIRFLOW_USER" bash -c "
export AIRFLOW_HOME=$AIRFLOW_HOME
source $AIRFLOW_HOME/airflow-test-env/bin/activate
airflow version && pytest --version
"; then
echo -e "${GREEN}✓ Airflow and pytest installation verified${NC}"
else
echo -e "${RED}✗ Verification failed${NC}"
exit 1
fi
echo -e "${GREEN}Installation completed successfully!${NC}"
echo -e "${GREEN}Airflow Home: $AIRFLOW_HOME${NC}"
echo -e "${GREEN}Airflow User: $AIRFLOW_USER${NC}"
echo -e "${YELLOW}To activate the environment, run as $AIRFLOW_USER:${NC}"
echo -e "${YELLOW} source $AIRFLOW_HOME/airflow-test-env/bin/activate${NC}"
echo -e "${YELLOW} export AIRFLOW_HOME=$AIRFLOW_HOME${NC}"
Review the script before running. Execute with: bash install.sh