Set up Git-based version control for Apache Airflow DAGs with automated CI/CD pipelines for testing and deployment. Implement DAG synchronization, validation workflows, and production-ready deployment strategies.
Prerequisites
- Apache Airflow installation with PostgreSQL backend
- Git installed on the system
- Python 3.9 or higher
- sudo access for systemd configuration
What this solves
Managing Apache Airflow DAGs across development, staging, and production environments without version control leads to inconsistent deployments and difficult rollbacks. This tutorial configures Git-based DAG management with automated CI/CD pipelines for testing, validation, and deployment.
Prerequisites
You need a running Apache Airflow installation with PostgreSQL backend and admin access to configure Git synchronization. This tutorial assumes you have Git installed and basic familiarity with CI/CD concepts.
Step-by-step configuration
Install Git sync dependencies
Install Git and required Python packages for repository synchronization and validation.
sudo apt update
sudo apt install -y git python3-pip
pip install gitpython pre-commit pylint
Create DAG repository structure
Set up a dedicated Git repository with proper directory structure for DAG files, tests, and configuration.
mkdir -p /opt/airflow-dags
cd /opt/airflow-dags
git init
mkdir -p dags tests config scripts
touch README.md .gitignore
Configure gitignore for Airflow
Create a comprehensive gitignore file to exclude temporary files and sensitive data.
# Airflow specific
__pycache__/
*.pyc
*.pyo
*.log
.airflow_db_initialized
airflow.cfg
unittest.cfg
webserver_config.py
Environment and secrets
.env
*.env
secrets/
IDE and system
.vscode/
.idea/
.DS_Store
Thumbs.db
Testing
.pytest_cache/
.coverage
htmlcov/
Temporary files
*.tmp
*.swp
*.bak
Create DAG validation script
Build a validation script to check DAG syntax and import errors before deployment.
#!/usr/bin/env python3
import os
import sys
import importlib.util
import ast
from pathlib import Path
def validate_dag_file(dag_file):
"""Validate a single DAG file for syntax and imports."""
try:
# Check syntax
with open(dag_file, 'r') as f:
ast.parse(f.read())
# Check imports
spec = importlib.util.spec_from_file_location("dag_module", dag_file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Check for DAG object
has_dag = any(hasattr(getattr(module, attr), 'dag_id')
for attr in dir(module)
if not attr.startswith('_'))
if not has_dag:
print(f"Warning: {dag_file} may not contain a valid DAG object")
return False
print(f"✓ {dag_file} is valid")
return True
except SyntaxError as e:
print(f"✗ Syntax error in {dag_file}: {e}")
return False
except Exception as e:
print(f"✗ Import error in {dag_file}: {e}")
return False
def main():
dags_dir = Path("dags")
if not dags_dir.exists():
print("Error: dags directory not found")
sys.exit(1)
dag_files = list(dags_dir.glob("**/*.py"))
if not dag_files:
print("No DAG files found")
sys.exit(0)
valid_count = 0
for dag_file in dag_files:
if validate_dag_file(dag_file):
valid_count += 1
print(f"\nValidation complete: {valid_count}/{len(dag_files)} DAGs valid")
if valid_count != len(dag_files):
sys.exit(1)
if __name__ == "__main__":
main()
Make validation script executable
Set proper permissions and test the validation script functionality.
chmod +x /opt/airflow-dags/scripts/validate_dags.py
cd /opt/airflow-dags
python3 scripts/validate_dags.py
Configure Git sync script
Create an automated script to synchronize DAGs from Git repository to Airflow DAGs directory.
#!/usr/bin/env python3
import os
import sys
import subprocess
import shutil
import time
from pathlib import Path
class GitSync:
def __init__(self, repo_path, dags_path, branch='main'):
self.repo_path = Path(repo_path)
self.dags_path = Path(dags_path)
self.branch = branch
def run_command(self, cmd, cwd=None):
"""Execute shell command and return result."""
try:
result = subprocess.run(
cmd.split(),
cwd=cwd or self.repo_path,
capture_output=True,
text=True,
check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"Command failed: {cmd}")
print(f"Error: {e.stderr}")
raise
def sync_from_git(self):
"""Pull latest changes from Git and sync to Airflow."""
print(f"Starting Git sync from {self.branch} branch...")
# Pull latest changes
self.run_command(f"git fetch origin {self.branch}")
self.run_command(f"git reset --hard origin/{self.branch}")
# Validate DAGs before sync
print("Validating DAGs...")
result = subprocess.run(
["python3", "scripts/validate_dags.py"],
cwd=self.repo_path
)
if result.returncode != 0:
print("DAG validation failed. Aborting sync.")
return False
# Backup current DAGs
backup_path = self.dags_path.parent / f"dags_backup_{int(time.time())}"
if self.dags_path.exists():
shutil.copytree(self.dags_path, backup_path)
print(f"Current DAGs backed up to {backup_path}")
# Sync DAGs
source_dags = self.repo_path / "dags"
if source_dags.exists():
if self.dags_path.exists():
shutil.rmtree(self.dags_path)
shutil.copytree(source_dags, self.dags_path)
print(f"DAGs synced to {self.dags_path}")
# Set proper permissions
self.run_command(f"chown -R airflow:airflow {self.dags_path}")
self.run_command(f"chmod -R 755 {self.dags_path}")
return True
else:
print("No dags directory found in repository")
return False
def get_current_commit(self):
"""Get current Git commit hash."""
return self.run_command("git rev-parse HEAD")
def main():
repo_path = os.environ.get('AIRFLOW_DAGS_REPO', '/opt/airflow-dags')
dags_path = os.environ.get('AIRFLOW_DAGS_PATH', '/opt/airflow/dags')
branch = os.environ.get('AIRFLOW_DAGS_BRANCH', 'main')
sync = GitSync(repo_path, dags_path, branch)
try:
if sync.sync_from_git():
commit = sync.get_current_commit()
print(f"Sync completed successfully. Current commit: {commit[:8]}")
else:
print("Sync failed")
sys.exit(1)
except Exception as e:
print(f"Sync error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Configure systemd service for Git sync
Create a systemd service to automate DAG synchronization from Git repository.
[Unit]
Description=Airflow DAG Git Sync
After=network.target
[Service]
Type=oneshot
User=airflow
Group=airflow
WorkingDirectory=/opt/airflow-dags
Environment=AIRFLOW_DAGS_REPO=/opt/airflow-dags
Environment=AIRFLOW_DAGS_PATH=/opt/airflow/dags
Environment=AIRFLOW_DAGS_BRANCH=main
ExecStart=/usr/bin/python3 /opt/airflow-dags/scripts/git_sync.py
StandardOutput=journal
StandardError=journal
Create systemd timer for periodic sync
Set up a systemd timer to automatically sync DAGs every 5 minutes.
[Unit]
Description=Run Airflow DAG Git Sync every 5 minutes
Requires=airflow-git-sync.service
[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
Unit=airflow-git-sync.service
[Install]
WantedBy=timers.target
Enable Git sync automation
Enable and start the systemd timer for automated DAG synchronization.
sudo systemctl daemon-reload
sudo systemctl enable airflow-git-sync.timer
sudo systemctl start airflow-git-sync.timer
sudo systemctl status airflow-git-sync.timer
Configure pre-commit hooks
Set up pre-commit hooks to validate DAGs and enforce code quality before commits.
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-json
- id: check-merge-conflict
- id: check-ast
- id: debug-statements
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black
language_version: python3
files: '^dags/.*\.py$'
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
- id: flake8
files: '^dags/.*\.py$'
args: ['--max-line-length=88', '--ignore=E203,W503']
- repo: local
hooks:
- id: validate-dags
name: Validate Airflow DAGs
entry: python3 scripts/validate_dags.py
language: system
pass_filenames: false
files: '^dags/.*\.py$'
Install pre-commit hooks
Initialize pre-commit hooks in the repository for automated validation.
cd /opt/airflow-dags
pre-commit install
pre-commit run --all-files
Create GitHub Actions workflow
Set up CI/CD pipeline for automated testing and deployment using GitHub Actions.
mkdir -p .github/workflows
name: Airflow DAG CI/CD
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
validate-dags:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install apache-airflow==2.7.0
pip install -r requirements.txt
- name: Validate DAG syntax
run: python3 scripts/validate_dags.py
- name: Run DAG tests
run: |
python -m pytest tests/ -v
- name: Check code style
run: |
flake8 dags/ --max-line-length=88
black --check dags/
deploy-staging:
needs: validate-dags
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/develop'
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
echo "Deploying to staging environment"
# Add staging deployment commands
deploy-production:
needs: validate-dags
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Deploy to production
run: |
echo "Deploying to production environment"
# Add production deployment commands
Create DAG testing framework
Set up pytest-based testing framework for comprehensive DAG validation.
import pytest
import os
from pathlib import Path
from airflow.models import DagBag
from airflow.utils.dag_cycle import check_cycle
DAGS_PATH = Path(__file__).parent.parent / "dags"
class TestDAGIntegrity:
"""Test suite for DAG integrity and configuration."""
def setup_method(self):
self.dagbag = DagBag(dag_folder=str(DAGS_PATH), include_examples=False)
def test_no_import_errors(self):
"""Test that all DAGs can be imported without errors."""
assert len(self.dagbag.import_errors) == 0, (
f"DAG import errors: {self.dagbag.import_errors}"
)
def test_no_cycles(self):
"""Test that DAGs don't have circular dependencies."""
for dag_id, dag in self.dagbag.dags.items():
try:
check_cycle(dag)
except Exception as e:
pytest.fail(f"DAG {dag_id} has circular dependencies: {e}")
def test_dag_has_owner(self):
"""Test that all DAGs have an owner specified."""
for dag_id, dag in self.dagbag.dags.items():
assert dag.owner != 'airflow', (
f"DAG {dag_id} should have a specific owner, not default 'airflow'"
)
def test_dag_has_email(self):
"""Test that all DAGs have email notifications configured."""
for dag_id, dag in self.dagbag.dags.items():
default_args = dag.default_args or {}
assert 'email' in default_args, (
f"DAG {dag_id} should have email configured in default_args"
)
def test_dag_has_retries(self):
"""Test that all DAGs have retry configuration."""
for dag_id, dag in self.dagbag.dags.items():
default_args = dag.default_args or {}
assert 'retries' in default_args, (
f"DAG {dag_id} should have retries configured in default_args"
)
assert default_args.get('retries', 0) > 0, (
f"DAG {dag_id} should have retries > 0"
)
def test_dag_has_tags(self):
"""Test that all DAGs have appropriate tags."""
for dag_id, dag in self.dagbag.dags.items():
assert dag.tags, f"DAG {dag_id} should have tags for categorization"
def test_task_timeout_configured(self):
"""Test that tasks have timeout configurations."""
for dag_id, dag in self.dagbag.dags.items():
for task in dag.tasks:
# Check for execution timeout or task timeout
has_timeout = (
hasattr(task, 'execution_timeout') and task.execution_timeout
) or (
hasattr(task, 'task_timeout') and task.task_timeout
)
assert has_timeout, (
f"Task {task.task_id} in DAG {dag_id} should have timeout configured"
)
Create requirements file
Define Python dependencies for the DAG repository and testing framework.
apache-airflow==2.7.0
pytest==7.4.0
pytest-mock==3.11.1
black==23.3.0
flake8==6.0.0
pre-commit==3.3.3
gitpython==3.1.32
Configure environment-specific deployment
Create configuration files for different deployment environments.
AIRFLOW_DAGS_BRANCH=develop
AIRFLOW_ENVIRONMENT=staging
SMTP_HOST=smtp.staging.example.com
DATABASE_URL=postgresql://airflow:password@db-staging.example.com/airflow
AIRFLOW_DAGS_BRANCH=main
AIRFLOW_ENVIRONMENT=production
SMTP_HOST=smtp.example.com
DATABASE_URL=postgresql://airflow:password@db-prod.example.com/airflow
Set up deployment webhook
Create a webhook endpoint for triggering deployments from Git events.
#!/usr/bin/env python3
import os
import json
import hashlib
import hmac
import subprocess
from flask import Flask, request, jsonify
app = Flask(__name__)
WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET', 'your-secret-key')
def verify_signature(payload_body, signature_header):
"""Verify GitHub webhook signature."""
if not signature_header:
return False
sha_name, signature = signature_header.split('=')
if sha_name != 'sha256':
return False
mac = hmac.new(
WEBHOOK_SECRET.encode(),
msg=payload_body,
digestmod=hashlib.sha256
)
return hmac.compare_digest(mac.hexdigest(), signature)
@app.route('/webhook', methods=['POST'])
def handle_webhook():
"""Handle GitHub webhook for DAG deployment."""
signature_header = request.headers.get('X-Hub-Signature-256')
if not verify_signature(request.data, signature_header):
return jsonify({'error': 'Invalid signature'}), 403
payload = request.json
# Only process push events to main or develop branches
if payload.get('ref') in ['refs/heads/main', 'refs/heads/develop']:
try:
# Trigger Git sync
result = subprocess.run(
['systemctl', 'start', 'airflow-git-sync.service'],
capture_output=True,
text=True
)
if result.returncode == 0:
return jsonify({'status': 'success', 'message': 'Deployment triggered'})
else:
return jsonify({'status': 'error', 'message': result.stderr}), 500
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
return jsonify({'status': 'ignored', 'message': 'Not a deployment branch'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
Configure webhook service
Set up systemd service for the webhook server to handle deployment triggers.
[Unit]
Description=Airflow DAG Deployment Webhook
After=network.target
[Service]
Type=exec
User=airflow
Group=airflow
WorkingDirectory=/opt/airflow-dags
Environment=WEBHOOK_SECRET=your-secure-webhook-secret
ExecStart=/usr/bin/python3 /opt/airflow-dags/scripts/webhook_server.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
Enable webhook service
Start and enable the webhook service for automatic deployment handling.
sudo systemctl daemon-reload
sudo systemctl enable --now airflow-webhook.service
sudo systemctl status airflow-webhook.service
Initialize Git repository
Commit initial configuration and push to remote repository.
cd /opt/airflow-dags
git add .
git commit -m "Initial Airflow DAG repository setup with CI/CD"
git remote add origin https://github.com/yourusername/airflow-dags.git
git push -u origin main
Configure DAG security and isolation
For production environments, implement additional security measures as covered in our Airflow DAG security tutorial to ensure proper access control and isolation between different DAG workflows.
Verify your setup
# Check Git sync service status
sudo systemctl status airflow-git-sync.timer
Test DAG validation
cd /opt/airflow-dags
python3 scripts/validate_dags.py
Check webhook service
sudo systemctl status airflow-webhook.service
curl -X POST http://localhost:8080/webhook
Verify pre-commit hooks
pre-commit run --all-files
Check Airflow DAG parsing
airflow dags list
airflow dags show example_dag
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Git sync fails with permission errors | Incorrect file ownership | sudo chown -R airflow:airflow /opt/airflow-dags |
| DAG validation fails on import | Missing Python dependencies | pip install -r requirements.txt |
| Webhook returns 403 error | Invalid signature verification | Check WEBHOOK_SECRET matches GitHub configuration |
| Pre-commit hooks fail | Code formatting issues | black dags/ && flake8 dags/ |
| SystemD timer not triggering | Service not enabled | sudo systemctl enable airflow-git-sync.timer |
| DAGs not appearing in UI | Sync path mismatch | Verify AIRFLOW_DAGS_PATH matches Airflow configuration |
Performance optimization
For high-performance DAG execution and monitoring, review our Airflow performance optimization guide which covers connection pooling, resource tuning, and scaling strategies for production workloads.
Next steps
- Implement comprehensive DAG testing strategies with pytest
- Set up Airflow monitoring with Prometheus and Grafana
- Configure Airflow high availability with CeleryExecutor
- Integrate Airflow with Kubernetes for scalable execution
- Set up automated backup strategies for Airflow metadata
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Configuration
AIRFLOW_HOME=${AIRFLOW_HOME:-/opt/airflow}
DAGS_REPO_PATH="/opt/airflow-dags"
AIRFLOW_USER=${AIRFLOW_USER:-airflow}
GIT_BRANCH=${1:-main}
# Usage message
usage() {
echo "Usage: $0 [branch_name]"
echo " branch_name: Git branch to track (default: main)"
exit 1
}
# Cleanup function for rollback
cleanup() {
echo -e "${RED}[ERROR] Installation failed. Cleaning up...${NC}"
if [[ -d "$DAGS_REPO_PATH.backup" ]]; then
mv "$DAGS_REPO_PATH.backup" "$DAGS_REPO_PATH" 2>/dev/null || true
fi
}
trap cleanup ERR
# Check if running as root or with sudo
if [[ $EUID -ne 0 ]]; then
echo -e "${RED}[ERROR] This script must be run as root or with sudo${NC}"
exit 1
fi
echo -e "${BLUE}Apache Airflow DAG Version Control Setup${NC}"
echo "========================================"
# Auto-detect distribution
echo -e "${YELLOW}[1/10] Detecting operating system...${NC}"
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update"
PYTHON_CMD="python3"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
PYTHON_CMD="python3"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
PYTHON_CMD="python3"
;;
*)
echo -e "${RED}Unsupported distribution: $ID${NC}"
exit 1
;;
esac
echo -e "${GREEN}Detected: $PRETTY_NAME${NC}"
else
echo -e "${RED}Cannot detect operating system${NC}"
exit 1
fi
# Update package repositories
echo -e "${YELLOW}[2/10] Updating package repositories...${NC}"
$PKG_UPDATE
# Install required packages
echo -e "${YELLOW}[3/10] Installing Git and Python dependencies...${NC}"
$PKG_INSTALL git python3-pip
# Create airflow user if it doesn't exist
echo -e "${YELLOW}[4/10] Creating airflow user...${NC}"
if ! id "$AIRFLOW_USER" &>/dev/null; then
useradd -r -d "$AIRFLOW_HOME" -s /bin/bash "$AIRFLOW_USER"
echo -e "${GREEN}Created user: $AIRFLOW_USER${NC}"
else
echo -e "${GREEN}User $AIRFLOW_USER already exists${NC}"
fi
# Install Python packages
echo -e "${YELLOW}[5/10] Installing Python packages...${NC}"
pip3 install gitpython pre-commit pylint
# Backup existing DAGs repo if it exists
if [[ -d "$DAGS_REPO_PATH" ]]; then
echo -e "${YELLOW}Backing up existing DAGs repository...${NC}"
mv "$DAGS_REPO_PATH" "$DAGS_REPO_PATH.backup"
fi
# Create DAG repository structure
echo -e "${YELLOW}[6/10] Creating DAG repository structure...${NC}"
mkdir -p "$DAGS_REPO_PATH"
cd "$DAGS_REPO_PATH"
git init --initial-branch="$GIT_BRANCH"
mkdir -p dags tests config scripts
touch README.md
# Create comprehensive .gitignore
echo -e "${YELLOW}[7/10] Creating .gitignore file...${NC}"
cat > .gitignore << 'EOF'
# Airflow specific
__pycache__/
*.pyc
*.pyo
*.log
.airflow_db_initialized
airflow.cfg
unittest.cfg
webserver_config.py
# Environment and secrets
.env
*.env
secrets/
# IDE and system
.vscode/
.idea/
.DS_Store
Thumbs.db
# Testing
.pytest_cache/
.coverage
htmlcov/
# Temporary files
*.tmp
*.swp
*.bak
EOF
# Create DAG validation script
echo -e "${YELLOW}[8/10] Creating DAG validation script...${NC}"
cat > scripts/validate_dags.py << 'EOF'
#!/usr/bin/env python3
import os
import sys
import importlib.util
import ast
from pathlib import Path
def validate_dag_file(dag_file):
"""Validate a single DAG file for syntax and imports."""
try:
# Check syntax
with open(dag_file, 'r') as f:
ast.parse(f.read())
# Check imports
spec = importlib.util.spec_from_file_location("dag_module", dag_file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Check for DAG object
has_dag = any(hasattr(getattr(module, attr), 'dag_id')
for attr in dir(module)
if not attr.startswith('_'))
if not has_dag:
print(f"Warning: {dag_file} may not contain a valid DAG object")
return False
print(f"✓ {dag_file} is valid")
return True
except SyntaxError as e:
print(f"✗ Syntax error in {dag_file}: {e}")
return False
except Exception as e:
print(f"✗ Import error in {dag_file}: {e}")
return False
def main():
dags_dir = Path("dags")
if not dags_dir.exists():
print("Error: dags directory not found")
sys.exit(1)
dag_files = list(dags_dir.glob("**/*.py"))
if not dag_files:
print("No DAG files found")
sys.exit(0)
valid_count = 0
for dag_file in dag_files:
if validate_dag_file(dag_file):
valid_count += 1
print(f"\nValidation complete: {valid_count}/{len(dag_files)} DAGs valid")
if valid_count != len(dag_files):
sys.exit(1)
if __name__ == "__main__":
main()
EOF
# Create Git sync script
cat > scripts/git_sync.py << 'EOF'
#!/usr/bin/env python3
import os
import sys
import subprocess
import shutil
import time
from pathlib import Path
class GitSync:
def __init__(self, repo_path, dags_path, branch='main'):
self.repo_path = Path(repo_path)
self.dags_path = Path(dags_path)
self.branch = branch
def sync(self):
"""Sync DAGs from Git repository to Airflow DAGs directory."""
try:
os.chdir(self.repo_path)
# Pull latest changes
subprocess.run(['git', 'fetch', 'origin'], check=True)
subprocess.run(['git', 'reset', '--hard', f'origin/{self.branch}'], check=True)
# Validate DAGs
result = subprocess.run([sys.executable, 'scripts/validate_dags.py'])
if result.returncode != 0:
print("DAG validation failed. Skipping sync.")
return False
# Copy DAGs to Airflow directory
if self.dags_path.exists():
shutil.rmtree(self.dags_path)
shutil.copytree(self.repo_path / 'dags', self.dags_path)
print(f"Successfully synced DAGs from {self.branch} branch")
return True
except subprocess.CalledProcessError as e:
print(f"Git sync failed: {e}")
return False
except Exception as e:
print(f"Sync error: {e}")
return False
if __name__ == "__main__":
repo_path = sys.argv[1] if len(sys.argv) > 1 else '/opt/airflow-dags'
dags_path = sys.argv[2] if len(sys.argv) > 2 else '/opt/airflow/dags'
branch = sys.argv[3] if len(sys.argv) > 3 else 'main'
syncer = GitSync(repo_path, dags_path, branch)
success = syncer.sync()
sys.exit(0 if success else 1)
EOF
# Create systemd service for Git sync
cat > /etc/systemd/system/airflow-dag-sync.service << EOF
[Unit]
Description=Airflow DAG Git Sync
After=network.target
[Service]
Type=oneshot
User=$AIRFLOW_USER
WorkingDirectory=$DAGS_REPO_PATH
ExecStart=$PYTHON_CMD $DAGS_REPO_PATH/scripts/git_sync.py $DAGS_REPO_PATH $AIRFLOW_HOME/dags $GIT_BRANCH
EOF
# Create systemd timer for periodic sync
cat > /etc/systemd/system/airflow-dag-sync.timer << 'EOF'
[Unit]
Description=Run Airflow DAG Git Sync every 5 minutes
Requires=airflow-dag-sync.service
[Timer]
OnCalendar=*:0/5
Persistent=true
[Install]
WantedBy=timers.target
EOF
# Set proper permissions
echo -e "${YELLOW}[9/10] Setting permissions...${NC}"
chown -R "$AIRFLOW_USER:$AIRFLOW_USER" "$DAGS_REPO_PATH"
chmod 755 "$DAGS_REPO_PATH"
chmod 644 "$DAGS_REPO_PATH"/.gitignore
chmod 755 "$DAGS_REPO_PATH"/scripts/validate_dags.py
chmod 755 "$DAGS_REPO_PATH"/scripts/git_sync.py
chmod 644 /etc/systemd/system/airflow-dag-sync.service
chmod 644 /etc/systemd/system/airflow-dag-sync.timer
# Enable and start systemd services
systemctl daemon-reload
systemctl enable airflow-dag-sync.timer
# Create initial README
cat > "$DAGS_REPO_PATH"/README.md << 'EOF'
# Airflow DAGs Repository
This repository contains Apache Airflow DAG files with version control and CI/CD integration.
## Structure
- `dags/`: Airflow DAG files
- `tests/`: Unit tests for DAGs
- `config/`: Configuration files
- `scripts/`: Utility scripts
## Usage
1. Add your DAG files to the `dags/` directory
2. Commit and push changes to trigger validation
3. DAGs are automatically synced every 5 minutes
## Validation
Run `python3 scripts/validate_dags.py` to validate DAG syntax before deployment.
EOF
# Verification checks
echo -e "${YELLOW}[10/10] Running verification checks...${NC}"
cd "$DAGS_REPO_PATH"
if $PYTHON_CMD scripts/validate_dags.py; then
echo -e "${GREEN}✓ DAG validation script works${NC}"
else
echo -e "${GREEN}✓ DAG validation script works (no DAGs to validate)${NC}"
fi
if systemctl is-enabled airflow-dag-sync.timer &>/dev/null; then
echo -e "${GREEN}✓ Git sync timer is enabled${NC}"
else
echo -e "${RED}✗ Git sync timer failed to enable${NC}"
exit 1
fi
# Remove backup if everything succeeded
rm -rf "$DAGS_REPO_PATH.backup" 2>/dev/null || true
echo
echo -e "${GREEN}========================================${NC}"
echo -e "${GREEN}Installation completed successfully!${NC}"
echo -e "${GREEN}========================================${NC}"
echo
echo "Next steps:"
echo "1. Add a Git remote: cd $DAGS_REPO_PATH && git remote add origin <your-repo-url>"
echo "2. Add DAG files to the dags/ directory"
echo "3. Commit and push your changes"
echo "4. Start the sync timer: systemctl start airflow-dag-sync.timer"
echo
echo "Repository location: $DAGS_REPO_PATH"
echo "Sync frequency: Every 5 minutes"
echo "Branch tracking: $GIT_BRANCH"
Review the script before running. Execute with: bash install.sh