Configure Apache Airflow DAG version control with Git and CI/CD pipelines

Intermediate 45 min Apr 20, 2026 127 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up Git-based version control for Apache Airflow DAGs with automated CI/CD pipelines for testing and deployment. Implement DAG synchronization, validation workflows, and production-ready deployment strategies.

Prerequisites

  • Apache Airflow installation with PostgreSQL backend
  • Git installed on the system
  • Python 3.9 or higher
  • sudo access for systemd configuration

What this solves

Managing Apache Airflow DAGs across development, staging, and production environments without version control leads to inconsistent deployments and difficult rollbacks. This tutorial configures Git-based DAG management with automated CI/CD pipelines for testing, validation, and deployment.

Prerequisites

You need a running Apache Airflow installation with PostgreSQL backend and admin access to configure Git synchronization. This tutorial assumes you have Git installed and basic familiarity with CI/CD concepts.

Step-by-step configuration

Install Git sync dependencies

Install Git and required Python packages for repository synchronization and validation.

sudo apt update
sudo apt install -y git python3-pip
pip install gitpython pre-commit pylint
sudo dnf install -y git python3-pip
pip install gitpython pre-commit pylint

Create DAG repository structure

Set up a dedicated Git repository with proper directory structure for DAG files, tests, and configuration.

mkdir -p /opt/airflow-dags
cd /opt/airflow-dags
git init
mkdir -p dags tests config scripts
touch README.md .gitignore

Configure gitignore for Airflow

Create a comprehensive gitignore file to exclude temporary files and sensitive data.

# Airflow specific
__pycache__/
*.pyc
*.pyo
*.log
.airflow_db_initialized
airflow.cfg
unittest.cfg
webserver_config.py

Environment and secrets

.env *.env secrets/

IDE and system

.vscode/ .idea/ .DS_Store Thumbs.db

Testing

.pytest_cache/ .coverage htmlcov/

Temporary files

*.tmp *.swp *.bak

Create DAG validation script

Build a validation script to check DAG syntax and import errors before deployment.

#!/usr/bin/env python3
import os
import sys
import importlib.util
import ast
from pathlib import Path

def validate_dag_file(dag_file):
    """Validate a single DAG file for syntax and imports."""
    try:
        # Check syntax
        with open(dag_file, 'r') as f:
            ast.parse(f.read())
        
        # Check imports
        spec = importlib.util.spec_from_file_location("dag_module", dag_file)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        
        # Check for DAG object
        has_dag = any(hasattr(getattr(module, attr), 'dag_id') 
                     for attr in dir(module) 
                     if not attr.startswith('_'))
        
        if not has_dag:
            print(f"Warning: {dag_file} may not contain a valid DAG object")
            return False
        
        print(f"✓ {dag_file} is valid")
        return True
        
    except SyntaxError as e:
        print(f"✗ Syntax error in {dag_file}: {e}")
        return False
    except Exception as e:
        print(f"✗ Import error in {dag_file}: {e}")
        return False

def main():
    dags_dir = Path("dags")
    if not dags_dir.exists():
        print("Error: dags directory not found")
        sys.exit(1)
    
    dag_files = list(dags_dir.glob("**/*.py"))
    if not dag_files:
        print("No DAG files found")
        sys.exit(0)
    
    valid_count = 0
    for dag_file in dag_files:
        if validate_dag_file(dag_file):
            valid_count += 1
    
    print(f"\nValidation complete: {valid_count}/{len(dag_files)} DAGs valid")
    
    if valid_count != len(dag_files):
        sys.exit(1)

if __name__ == "__main__":
    main()

Make validation script executable

Set proper permissions and test the validation script functionality.

chmod +x /opt/airflow-dags/scripts/validate_dags.py
cd /opt/airflow-dags
python3 scripts/validate_dags.py

Configure Git sync script

Create an automated script to synchronize DAGs from Git repository to Airflow DAGs directory.

#!/usr/bin/env python3
import os
import sys
import subprocess
import shutil
import time
from pathlib import Path

class GitSync:
    def __init__(self, repo_path, dags_path, branch='main'):
        self.repo_path = Path(repo_path)
        self.dags_path = Path(dags_path)
        self.branch = branch
        
    def run_command(self, cmd, cwd=None):
        """Execute shell command and return result."""
        try:
            result = subprocess.run(
                cmd.split(), 
                cwd=cwd or self.repo_path,
                capture_output=True, 
                text=True, 
                check=True
            )
            return result.stdout.strip()
        except subprocess.CalledProcessError as e:
            print(f"Command failed: {cmd}")
            print(f"Error: {e.stderr}")
            raise
    
    def sync_from_git(self):
        """Pull latest changes from Git and sync to Airflow."""
        print(f"Starting Git sync from {self.branch} branch...")
        
        # Pull latest changes
        self.run_command(f"git fetch origin {self.branch}")
        self.run_command(f"git reset --hard origin/{self.branch}")
        
        # Validate DAGs before sync
        print("Validating DAGs...")
        result = subprocess.run(
            ["python3", "scripts/validate_dags.py"],
            cwd=self.repo_path
        )
        
        if result.returncode != 0:
            print("DAG validation failed. Aborting sync.")
            return False
        
        # Backup current DAGs
        backup_path = self.dags_path.parent / f"dags_backup_{int(time.time())}"
        if self.dags_path.exists():
            shutil.copytree(self.dags_path, backup_path)
            print(f"Current DAGs backed up to {backup_path}")
        
        # Sync DAGs
        source_dags = self.repo_path / "dags"
        if source_dags.exists():
            if self.dags_path.exists():
                shutil.rmtree(self.dags_path)
            shutil.copytree(source_dags, self.dags_path)
            print(f"DAGs synced to {self.dags_path}")
            
            # Set proper permissions
            self.run_command(f"chown -R airflow:airflow {self.dags_path}")
            self.run_command(f"chmod -R 755 {self.dags_path}")
            
            return True
        else:
            print("No dags directory found in repository")
            return False
    
    def get_current_commit(self):
        """Get current Git commit hash."""
        return self.run_command("git rev-parse HEAD")

def main():
    repo_path = os.environ.get('AIRFLOW_DAGS_REPO', '/opt/airflow-dags')
    dags_path = os.environ.get('AIRFLOW_DAGS_PATH', '/opt/airflow/dags')
    branch = os.environ.get('AIRFLOW_DAGS_BRANCH', 'main')
    
    sync = GitSync(repo_path, dags_path, branch)
    
    try:
        if sync.sync_from_git():
            commit = sync.get_current_commit()
            print(f"Sync completed successfully. Current commit: {commit[:8]}")
        else:
            print("Sync failed")
            sys.exit(1)
    except Exception as e:
        print(f"Sync error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Configure systemd service for Git sync

Create a systemd service to automate DAG synchronization from Git repository.

[Unit]
Description=Airflow DAG Git Sync
After=network.target

[Service]
Type=oneshot
User=airflow
Group=airflow
WorkingDirectory=/opt/airflow-dags
Environment=AIRFLOW_DAGS_REPO=/opt/airflow-dags
Environment=AIRFLOW_DAGS_PATH=/opt/airflow/dags
Environment=AIRFLOW_DAGS_BRANCH=main
ExecStart=/usr/bin/python3 /opt/airflow-dags/scripts/git_sync.py
StandardOutput=journal
StandardError=journal

Create systemd timer for periodic sync

Set up a systemd timer to automatically sync DAGs every 5 minutes.

[Unit]
Description=Run Airflow DAG Git Sync every 5 minutes
Requires=airflow-git-sync.service

[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
Unit=airflow-git-sync.service

[Install]
WantedBy=timers.target

Enable Git sync automation

Enable and start the systemd timer for automated DAG synchronization.

sudo systemctl daemon-reload
sudo systemctl enable airflow-git-sync.timer
sudo systemctl start airflow-git-sync.timer
sudo systemctl status airflow-git-sync.timer

Configure pre-commit hooks

Set up pre-commit hooks to validate DAGs and enforce code quality before commits.

repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.4.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-json
      - id: check-merge-conflict
      - id: check-ast
      - id: debug-statements
  
  - repo: https://github.com/psf/black
    rev: 23.3.0
    hooks:
      - id: black
        language_version: python3
        files: '^dags/.*\.py$'
  
  - repo: https://github.com/pycqa/flake8
    rev: 6.0.0
    hooks:
      - id: flake8
        files: '^dags/.*\.py$'
        args: ['--max-line-length=88', '--ignore=E203,W503']
  
  - repo: local
    hooks:
      - id: validate-dags
        name: Validate Airflow DAGs
        entry: python3 scripts/validate_dags.py
        language: system
        pass_filenames: false
        files: '^dags/.*\.py$'

Install pre-commit hooks

Initialize pre-commit hooks in the repository for automated validation.

cd /opt/airflow-dags
pre-commit install
pre-commit run --all-files

Create GitHub Actions workflow

Set up CI/CD pipeline for automated testing and deployment using GitHub Actions.

mkdir -p .github/workflows
name: Airflow DAG CI/CD

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  validate-dags:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v4
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'
    
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install apache-airflow==2.7.0
        pip install -r requirements.txt
    
    - name: Validate DAG syntax
      run: python3 scripts/validate_dags.py
    
    - name: Run DAG tests
      run: |
        python -m pytest tests/ -v
    
    - name: Check code style
      run: |
        flake8 dags/ --max-line-length=88
        black --check dags/
  
  deploy-staging:
    needs: validate-dags
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/develop'
    
    steps:
    - uses: actions/checkout@v4
    
    - name: Deploy to staging
      run: |
        echo "Deploying to staging environment"
        # Add staging deployment commands
  
  deploy-production:
    needs: validate-dags
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    
    steps:
    - uses: actions/checkout@v4
    
    - name: Deploy to production
      run: |
        echo "Deploying to production environment"
        # Add production deployment commands

Create DAG testing framework

Set up pytest-based testing framework for comprehensive DAG validation.

import pytest
import os
from pathlib import Path
from airflow.models import DagBag
from airflow.utils.dag_cycle import check_cycle

DAGS_PATH = Path(__file__).parent.parent / "dags"

class TestDAGIntegrity:
    """Test suite for DAG integrity and configuration."""
    
    def setup_method(self):
        self.dagbag = DagBag(dag_folder=str(DAGS_PATH), include_examples=False)
    
    def test_no_import_errors(self):
        """Test that all DAGs can be imported without errors."""
        assert len(self.dagbag.import_errors) == 0, (
            f"DAG import errors: {self.dagbag.import_errors}"
        )
    
    def test_no_cycles(self):
        """Test that DAGs don't have circular dependencies."""
        for dag_id, dag in self.dagbag.dags.items():
            try:
                check_cycle(dag)
            except Exception as e:
                pytest.fail(f"DAG {dag_id} has circular dependencies: {e}")
    
    def test_dag_has_owner(self):
        """Test that all DAGs have an owner specified."""
        for dag_id, dag in self.dagbag.dags.items():
            assert dag.owner != 'airflow', (
                f"DAG {dag_id} should have a specific owner, not default 'airflow'"
            )
    
    def test_dag_has_email(self):
        """Test that all DAGs have email notifications configured."""
        for dag_id, dag in self.dagbag.dags.items():
            default_args = dag.default_args or {}
            assert 'email' in default_args, (
                f"DAG {dag_id} should have email configured in default_args"
            )
    
    def test_dag_has_retries(self):
        """Test that all DAGs have retry configuration."""
        for dag_id, dag in self.dagbag.dags.items():
            default_args = dag.default_args or {}
            assert 'retries' in default_args, (
                f"DAG {dag_id} should have retries configured in default_args"
            )
            assert default_args.get('retries', 0) > 0, (
                f"DAG {dag_id} should have retries > 0"
            )
    
    def test_dag_has_tags(self):
        """Test that all DAGs have appropriate tags."""
        for dag_id, dag in self.dagbag.dags.items():
            assert dag.tags, f"DAG {dag_id} should have tags for categorization"
    
    def test_task_timeout_configured(self):
        """Test that tasks have timeout configurations."""
        for dag_id, dag in self.dagbag.dags.items():
            for task in dag.tasks:
                # Check for execution timeout or task timeout
                has_timeout = (
                    hasattr(task, 'execution_timeout') and task.execution_timeout
                ) or (
                    hasattr(task, 'task_timeout') and task.task_timeout
                )
                assert has_timeout, (
                    f"Task {task.task_id} in DAG {dag_id} should have timeout configured"
                )

Create requirements file

Define Python dependencies for the DAG repository and testing framework.

apache-airflow==2.7.0
pytest==7.4.0
pytest-mock==3.11.1
black==23.3.0
flake8==6.0.0
pre-commit==3.3.3
gitpython==3.1.32

Configure environment-specific deployment

Create configuration files for different deployment environments.

AIRFLOW_DAGS_BRANCH=develop
AIRFLOW_ENVIRONMENT=staging
SMTP_HOST=smtp.staging.example.com
DATABASE_URL=postgresql://airflow:password@db-staging.example.com/airflow
AIRFLOW_DAGS_BRANCH=main
AIRFLOW_ENVIRONMENT=production
SMTP_HOST=smtp.example.com
DATABASE_URL=postgresql://airflow:password@db-prod.example.com/airflow

Set up deployment webhook

Create a webhook endpoint for triggering deployments from Git events.

#!/usr/bin/env python3
import os
import json
import hashlib
import hmac
import subprocess
from flask import Flask, request, jsonify

app = Flask(__name__)
WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET', 'your-secret-key')

def verify_signature(payload_body, signature_header):
    """Verify GitHub webhook signature."""
    if not signature_header:
        return False
    
    sha_name, signature = signature_header.split('=')
    if sha_name != 'sha256':
        return False
    
    mac = hmac.new(
        WEBHOOK_SECRET.encode(),
        msg=payload_body,
        digestmod=hashlib.sha256
    )
    
    return hmac.compare_digest(mac.hexdigest(), signature)

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    """Handle GitHub webhook for DAG deployment."""
    signature_header = request.headers.get('X-Hub-Signature-256')
    
    if not verify_signature(request.data, signature_header):
        return jsonify({'error': 'Invalid signature'}), 403
    
    payload = request.json
    
    # Only process push events to main or develop branches
    if payload.get('ref') in ['refs/heads/main', 'refs/heads/develop']:
        try:
            # Trigger Git sync
            result = subprocess.run(
                ['systemctl', 'start', 'airflow-git-sync.service'],
                capture_output=True,
                text=True
            )
            
            if result.returncode == 0:
                return jsonify({'status': 'success', 'message': 'Deployment triggered'})
            else:
                return jsonify({'status': 'error', 'message': result.stderr}), 500
                
        except Exception as e:
            return jsonify({'status': 'error', 'message': str(e)}), 500
    
    return jsonify({'status': 'ignored', 'message': 'Not a deployment branch'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

Configure webhook service

Set up systemd service for the webhook server to handle deployment triggers.

[Unit]
Description=Airflow DAG Deployment Webhook
After=network.target

[Service]
Type=exec
User=airflow
Group=airflow
WorkingDirectory=/opt/airflow-dags
Environment=WEBHOOK_SECRET=your-secure-webhook-secret
ExecStart=/usr/bin/python3 /opt/airflow-dags/scripts/webhook_server.py
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target

Enable webhook service

Start and enable the webhook service for automatic deployment handling.

sudo systemctl daemon-reload
sudo systemctl enable --now airflow-webhook.service
sudo systemctl status airflow-webhook.service

Initialize Git repository

Commit initial configuration and push to remote repository.

cd /opt/airflow-dags
git add .
git commit -m "Initial Airflow DAG repository setup with CI/CD"
git remote add origin https://github.com/yourusername/airflow-dags.git
git push -u origin main

Configure DAG security and isolation

For production environments, implement additional security measures as covered in our Airflow DAG security tutorial to ensure proper access control and isolation between different DAG workflows.

Verify your setup

# Check Git sync service status
sudo systemctl status airflow-git-sync.timer

Test DAG validation

cd /opt/airflow-dags python3 scripts/validate_dags.py

Check webhook service

sudo systemctl status airflow-webhook.service curl -X POST http://localhost:8080/webhook

Verify pre-commit hooks

pre-commit run --all-files

Check Airflow DAG parsing

airflow dags list airflow dags show example_dag

Common issues

Symptom Cause Fix
Git sync fails with permission errors Incorrect file ownership sudo chown -R airflow:airflow /opt/airflow-dags
DAG validation fails on import Missing Python dependencies pip install -r requirements.txt
Webhook returns 403 error Invalid signature verification Check WEBHOOK_SECRET matches GitHub configuration
Pre-commit hooks fail Code formatting issues black dags/ && flake8 dags/
SystemD timer not triggering Service not enabled sudo systemctl enable airflow-git-sync.timer
DAGs not appearing in UI Sync path mismatch Verify AIRFLOW_DAGS_PATH matches Airflow configuration

Performance optimization

For high-performance DAG execution and monitoring, review our Airflow performance optimization guide which covers connection pooling, resource tuning, and scaling strategies for production workloads.

Next steps

Running this in production?

Need this managed for you? Setting this up once is straightforward. Keeping it patched, monitored, backed up and performant across environments is the harder part. See how we run infrastructure like this for European teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.