Set up comprehensive monitoring for Apache Airflow with Prometheus metrics collection, StatsD integration, and custom Grafana dashboards. Configure automated alerting for DAG failures, task timeouts, and system health issues.
Prerequisites
- Apache Airflow 2.x installed
- Python 3.8+ with pip
- Root or sudo access
- At least 4GB RAM available
- Basic understanding of Airflow DAGs
What this solves
Apache Airflow generates critical workflow metrics that need monitoring for production environments. This tutorial configures comprehensive Airflow monitoring using Prometheus for metrics collection, StatsD for real-time statistics, and Grafana for visualization. You'll set up automated alerts for DAG failures, task execution issues, and resource bottlenecks to ensure reliable workflow orchestration.
Step-by-step configuration
Install monitoring dependencies
Install the required packages for Prometheus, StatsD, and monitoring tools.
sudo apt update
sudo apt install -y prometheus prometheus-node-exporter statsd python3-statsd
sudo apt install -y grafana
Install Airflow monitoring dependencies
Install the Python packages needed for Airflow metrics export and StatsD integration.
pip install apache-airflow[statsd,prometheus]
pip install prometheus_client statsd
Configure Airflow for StatsD metrics
Update the Airflow configuration to enable StatsD metrics export for monitoring DAG and task performance.
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
statsd_allow_list =
statsd_custom_client_path =
[scheduler]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow.scheduler
[webserver]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow.webserver
Create StatsD configuration
Configure StatsD to collect Airflow metrics and export them in Prometheus format.
{
graphitePort: 2003
, graphiteHost: "127.0.0.1"
, port: 8125
, backends: [ "./backends/prometheus" ]
, prometheus: {
prefix: "airflow_",
port: 9102
}
, deleteIdleStats: true
, deleteGauges: true
, deleteTimers: true
, deleteSets: true
, deleteCounters: true
}
Configure Airflow metrics endpoint
Create a custom metrics endpoint for Airflow to expose Prometheus-compatible metrics directly.
from airflow.plugins_manager import AirflowPlugin
from airflow.www.app import csrf
from flask import Blueprint, Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from airflow.models import DagRun, TaskInstance, DagModel
from airflow.utils.db import provide_session
from prometheus_client import Gauge, Counter
import logging
log = logging.getLogger(__name__)
Define metrics
dag_run_total = Counter('airflow_dag_runs_total', 'Total DAG runs', ['dag_id', 'state'])
task_instance_total = Counter('airflow_task_instances_total', 'Total task instances', ['dag_id', 'task_id', 'state'])
dag_run_duration = Gauge('airflow_dag_run_duration_seconds', 'DAG run duration', ['dag_id'])
task_duration = Gauge('airflow_task_duration_seconds', 'Task duration', ['dag_id', 'task_id'])
prometheus_blueprint = Blueprint('prometheus', __name__)
@prometheus_blueprint.route('/metrics')
@provide_session
def metrics(session=None):
"""Prometheus metrics endpoint"""
try:
# Update DAG run metrics
dag_runs = session.query(DagRun).all()
for dag_run in dag_runs:
dag_run_total.labels(dag_id=dag_run.dag_id, state=dag_run.state)._value._value += 1
if dag_run.end_date and dag_run.start_date:
duration = (dag_run.end_date - dag_run.start_date).total_seconds()
dag_run_duration.labels(dag_id=dag_run.dag_id).set(duration)
# Update task instance metrics
task_instances = session.query(TaskInstance).all()
for ti in task_instances:
task_instance_total.labels(dag_id=ti.dag_id, task_id=ti.task_id, state=ti.state)._value._value += 1
if ti.end_date and ti.start_date:
duration = (ti.end_date - ti.start_date).total_seconds()
task_duration.labels(dag_id=ti.dag_id, task_id=ti.task_id).set(duration)
return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST)
except Exception as e:
log.error(f"Error generating metrics: {e}")
return Response("Error generating metrics", status=500)
class PrometheusMetricsPlugin(AirflowPlugin):
name = "prometheus_metrics"
flask_blueprints = [prometheus_blueprint]
Configure Prometheus targets
Add Airflow metrics endpoints to Prometheus scraping configuration for data collection.
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "airflow_alerts.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- localhost:9093
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'node-exporter'
static_configs:
- targets: ['localhost:9100']
- job_name: 'airflow-webserver'
static_configs:
- targets: ['localhost:8080']
metrics_path: '/metrics'
scrape_interval: 30s
- job_name: 'airflow-statsd'
static_configs:
- targets: ['localhost:9102']
scrape_interval: 15s
- job_name: 'airflow-scheduler'
static_configs:
- targets: ['localhost:8793']
metrics_path: '/health'
scrape_interval: 30s
Create Airflow alerting rules
Define comprehensive alerting rules for DAG failures, task timeouts, and system health issues.
groups:
- name: airflow_alerts
rules:
- alert: AirflowDagFailed
expr: increase(airflow_dag_runs_total{state="failed"}[5m]) > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Airflow DAG {{ $labels.dag_id }} failed"
description: "DAG {{ $labels.dag_id }} has failed runs in the last 5 minutes"
- alert: AirflowTaskFailed
expr: increase(airflow_task_instances_total{state="failed"}[5m]) > 0
for: 2m
labels:
severity: warning
annotations:
summary: "Airflow task {{ $labels.task_id }} in DAG {{ $labels.dag_id }} failed"
description: "Task {{ $labels.task_id }} in DAG {{ $labels.dag_id }} has failed"
- alert: AirflowSchedulerDown
expr: up{job="airflow-scheduler"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Airflow scheduler is down"
description: "Airflow scheduler has been down for more than 1 minute"
- alert: AirflowWebserverDown
expr: up{job="airflow-webserver"} == 0
for: 2m
labels:
severity: warning
annotations:
summary: "Airflow webserver is down"
description: "Airflow webserver has been down for more than 2 minutes"
- alert: AirflowDagRunDuration
expr: airflow_dag_run_duration_seconds > 3600
for: 5m
labels:
severity: warning
annotations:
summary: "Airflow DAG {{ $labels.dag_id }} running too long"
description: "DAG {{ $labels.dag_id }} has been running for more than 1 hour"
- alert: AirflowTaskQueueHigh
expr: airflow_executor_queued_tasks > 100
for: 5m
labels:
severity: warning
annotations:
summary: "High number of queued Airflow tasks"
description: "More than 100 tasks are queued in Airflow executor"
- alert: AirflowDagImportErrors
expr: airflow_dag_processing_import_errors > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Airflow DAG import errors detected"
description: "{{ $value }} DAG import errors detected in Airflow"
Start monitoring services
Enable and start Prometheus, StatsD, and Grafana services for monitoring collection.
sudo systemctl enable --now prometheus
sudo systemctl enable --now node_exporter
sudo systemctl enable --now statsd
sudo systemctl enable --now grafana-server
Restart Airflow services with new configuration
sudo systemctl restart airflow-webserver
sudo systemctl restart airflow-scheduler
Configure Grafana data source
Add Prometheus as a data source in Grafana for Airflow metrics visualization.
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://localhost:9090
isDefault: true
editable: false
Create Airflow Grafana dashboard
Import a comprehensive Airflow dashboard with DAG metrics, task performance, and system health panels.
{
"dashboard": {
"id": null,
"title": "Apache Airflow Monitoring",
"tags": ["airflow"],
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "DAG Runs by State",
"type": "stat",
"targets": [
{
"expr": "sum by (state) (airflow_dag_runs_total)",
"legendFormat": "{{state}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
},
{
"id": 2,
"title": "Task Instances by State",
"type": "piechart",
"targets": [
{
"expr": "sum by (state) (airflow_task_instances_total)",
"legendFormat": "{{state}}"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
},
{
"id": 3,
"title": "DAG Run Duration",
"type": "graph",
"targets": [
{
"expr": "airflow_dag_run_duration_seconds",
"legendFormat": "{{dag_id}}"
}
],
"gridPos": {"h": 8, "w": 24, "x": 0, "y": 8}
},
{
"id": 4,
"title": "Failed DAGs (Last 24h)",
"type": "table",
"targets": [
{
"expr": "increase(airflow_dag_runs_total{state=\"failed\"}[24h]) > 0",
"format": "table"
}
],
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 16}
},
{
"id": 5,
"title": "System Resources",
"type": "graph",
"targets": [
{
"expr": "100 - (avg by (instance) (rate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)",
"legendFormat": "CPU Usage %"
},
{
"expr": "(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100",
"legendFormat": "Memory Usage %"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 16}
}
],
"time": {"from": "now-24h", "to": "now"},
"refresh": "30s"
}
}
Configure Grafana dashboard provisioning
Set up automatic dashboard provisioning to load the Airflow monitoring dashboard on startup.
apiVersion: 1
providers:
- name: 'airflow'
orgId: 1
folder: 'Airflow'
type: file
disableDeletion: false
editable: true
updateIntervalSeconds: 10
allowUiUpdates: true
options:
path: /etc/grafana/provisioning/dashboards
Set up Grafana alerts
Configure Grafana notification channels and alert rules for Airflow monitoring.
notifiers:
- name: email-alerts
type: email
uid: email-alerts
orgId: 1
isDefault: true
settings:
addresses: "admin@example.com"
subject: "Airflow Alert"
uploadImage: true
Restart services with new configuration
Restart all services to apply the monitoring configuration and begin collecting metrics.
sudo systemctl restart prometheus
sudo systemctl restart grafana-server
sudo systemctl restart statsd
Verify services are running
sudo systemctl status prometheus grafana-server statsd
Verify your setup
Check that all monitoring components are working correctly and collecting Airflow metrics.
# Check Prometheus targets
curl http://localhost:9090/api/v1/targets
Verify Airflow metrics endpoint
curl http://localhost:8080/metrics
Check StatsD metrics
curl http://localhost:9102/metrics
Test Grafana connectivity
curl http://localhost:3000/api/health
Verify Airflow services
sudo systemctl status airflow-webserver airflow-scheduler
Check Prometheus rules
prometheus-tool query "airflow_dag_runs_total"
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Airflow metrics not appearing | StatsD configuration incorrect | Check airflow.cfg statsd settings and restart services |
| Prometheus can't scrape targets | Firewall blocking ports | Open ports 8080, 9102, 8793 for Airflow metrics |
| Grafana dashboard shows no data | Prometheus data source misconfigured | Verify Prometheus URL in datasource configuration |
| Alerts not firing | Alert rules syntax errors | Use promtool check rules airflow_alerts.yml |
| StatsD metrics missing | StatsD backend not configured | Install statsd prometheus backend with npm install statsd-prometheus-backend |
| Task duration metrics empty | Task instances not completing | Check task logs and DAG execution history |
Next steps
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Global variables
AIRFLOW_HOME="${AIRFLOW_HOME:-/opt/airflow}"
AIRFLOW_USER="${AIRFLOW_USER:-airflow}"
# Usage function
usage() {
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " -h, --help Show this help message"
echo " --airflow-home PATH Airflow home directory (default: /opt/airflow)"
echo " --airflow-user USER Airflow user (default: airflow)"
exit 1
}
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
-h|--help)
usage
;;
--airflow-home)
AIRFLOW_HOME="$2"
shift 2
;;
--airflow-user)
AIRFLOW_USER="$2"
shift 2
;;
*)
echo -e "${RED}Unknown option: $1${NC}"
usage
;;
esac
done
# Error handling with cleanup
cleanup() {
echo -e "${RED}Installation failed. Cleaning up...${NC}"
systemctl stop prometheus node_exporter grafana-server statsd 2>/dev/null || true
}
trap cleanup ERR
# Check prerequisites
if [[ $EUID -ne 0 ]]; then
echo -e "${RED}This script must be run as root${NC}"
exit 1
fi
# Detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update"
PKG_INSTALL="apt install -y"
GRAFANA_REPO_CMD="wget -q -O - https://packages.grafana.com/gpg.key | apt-key add - && echo 'deb https://packages.grafana.com/oss/deb stable main' > /etc/apt/sources.list.d/grafana.list"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
GRAFANA_REPO_CMD="cat > /etc/yum.repos.d/grafana.repo << 'EOF'
[grafana]
name=grafana
baseurl=https://packages.grafana.com/oss/rpm
repo_gpgcheck=1
enabled=1
gpgcheck=1
gpgkey=https://packages.grafana.com/gpg.key
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
EOF"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
GRAFANA_REPO_CMD="cat > /etc/yum.repos.d/grafana.repo << 'EOF'
[grafana]
name=grafana
baseurl=https://packages.grafana.com/oss/rpm
repo_gpgcheck=1
enabled=1
gpgcheck=1
gpgkey=https://packages.grafana.com/gpg.key
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
EOF"
;;
*)
echo -e "${RED}Unsupported distribution: $ID${NC}"
exit 1
;;
esac
else
echo -e "${RED}Cannot detect distribution${NC}"
exit 1
fi
echo -e "${GREEN}[1/8] Updating package repositories...${NC}"
$PKG_UPDATE
echo -e "${GREEN}[2/8] Installing system dependencies...${NC}"
case "$ID" in
ubuntu|debian)
$PKG_INSTALL prometheus prometheus-node-exporter nodejs npm python3-pip
;;
*)
$PKG_INSTALL epel-release
$PKG_INSTALL prometheus node_exporter nodejs npm python3-pip
;;
esac
echo -e "${GREEN}[3/8] Installing Grafana...${NC}"
eval $GRAFANA_REPO_CMD
$PKG_UPDATE
$PKG_INSTALL grafana
echo -e "${GREEN}[4/8] Installing StatsD and Node.js dependencies...${NC}"
npm install -g statsd@0.8.6 statsd-prometheus-backend
echo -e "${GREEN}[5/8] Installing Airflow monitoring dependencies...${NC}"
pip3 install apache-airflow[statsd,prometheus] prometheus_client statsd
echo -e "${GREEN}[6/8] Creating Airflow user and directories...${NC}"
if ! id "$AIRFLOW_USER" &>/dev/null; then
useradd -r -s /bin/bash -d "$AIRFLOW_HOME" -m "$AIRFLOW_USER"
fi
mkdir -p "$AIRFLOW_HOME/dags" "$AIRFLOW_HOME/logs" "$AIRFLOW_HOME/plugins"
chown -R "$AIRFLOW_USER:$AIRFLOW_USER" "$AIRFLOW_HOME"
echo -e "${GREEN}[7/8] Configuring monitoring services...${NC}"
# Create StatsD configuration
cat > /etc/statsd-config.js << 'EOF'
{
"graphitePort": 2003,
"graphiteHost": "127.0.0.1",
"port": 8125,
"backends": ["statsd-prometheus-backend"],
"prometheus": {
"prefix": "airflow_",
"port": 9102
},
"deleteIdleStats": true,
"deleteGauges": true,
"deleteTimers": true,
"deleteSets": true,
"deleteCounters": true
}
EOF
# Create StatsD systemd service
cat > /etc/systemd/system/statsd.service << 'EOF'
[Unit]
Description=StatsD Network Daemon
After=network.target
[Service]
Type=simple
User=nobody
ExecStart=/usr/bin/node /usr/lib/node_modules/statsd/stats.js /etc/statsd-config.js
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
# Configure Prometheus
cat >> /etc/prometheus/prometheus.yml << 'EOF'
- job_name: 'airflow-statsd'
static_configs:
- targets: ['localhost:9102']
- job_name: 'airflow-metrics'
static_configs:
- targets: ['localhost:8080']
metrics_path: '/admin/metrics'
EOF
# Create Airflow metrics plugin
mkdir -p "$AIRFLOW_HOME/plugins"
cat > "$AIRFLOW_HOME/plugins/prometheus_metrics.py" << 'EOF'
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint, Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from prometheus_client import Gauge, Counter
from airflow.models import DagRun, TaskInstance
from airflow.utils.db import provide_session
import logging
log = logging.getLogger(__name__)
# Define metrics
dag_run_total = Counter('airflow_dag_runs_total', 'Total DAG runs', ['dag_id', 'state'])
task_instance_total = Counter('airflow_task_instances_total', 'Total task instances', ['dag_id', 'task_id', 'state'])
dag_run_duration = Gauge('airflow_dag_run_duration_seconds', 'DAG run duration', ['dag_id'])
prometheus_blueprint = Blueprint('prometheus_metrics', __name__)
@prometheus_blueprint.route('/admin/metrics')
@provide_session
def metrics(session=None):
try:
return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST)
except Exception as e:
log.error(f"Error generating metrics: {e}")
return Response("Error generating metrics", status=500)
class PrometheusMetricsPlugin(AirflowPlugin):
name = "prometheus_metrics"
flask_blueprints = [prometheus_blueprint]
EOF
chown "$AIRFLOW_USER:$AIRFLOW_USER" "$AIRFLOW_HOME/plugins/prometheus_metrics.py"
chmod 644 "$AIRFLOW_HOME/plugins/prometheus_metrics.py"
# Configure Airflow for metrics
cat > "$AIRFLOW_HOME/airflow.cfg" << EOF
[core]
dags_folder = $AIRFLOW_HOME/dags
base_log_folder = $AIRFLOW_HOME/logs
plugins_folder = $AIRFLOW_HOME/plugins
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
[scheduler]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow.scheduler
[webserver]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow.webserver
base_url = http://localhost:8080
EOF
chown "$AIRFLOW_USER:$AIRFLOW_USER" "$AIRFLOW_HOME/airflow.cfg"
chmod 644 "$AIRFLOW_HOME/airflow.cfg"
echo -e "${GREEN}[8/8] Starting and enabling services...${NC}"
systemctl daemon-reload
systemctl enable --now prometheus node_exporter grafana-server statsd
systemctl restart prometheus
# Configure firewall
if command -v firewall-cmd &> /dev/null && systemctl is-active firewalld &> /dev/null; then
firewall-cmd --permanent --add-port=3000/tcp --add-port=9090/tcp --add-port=9102/tcp
firewall-cmd --reload
fi
# Verify installation
echo -e "${GREEN}Verifying installation...${NC}"
sleep 5
if systemctl is-active --quiet prometheus && systemctl is-active --quiet grafana-server && systemctl is-active --quiet statsd; then
echo -e "${GREEN}✓ All services are running${NC}"
echo -e "${GREEN}✓ Prometheus: http://localhost:9090${NC}"
echo -e "${GREEN}✓ Grafana: http://localhost:3000 (admin/admin)${NC}"
echo -e "${GREEN}✓ StatsD metrics: http://localhost:9102/metrics${NC}"
echo -e "${YELLOW}Note: Start Airflow webserver and scheduler to see metrics${NC}"
else
echo -e "${RED}Some services failed to start. Check logs with: journalctl -u <service>${NC}"
exit 1
fi
Review the script before running. Execute with: bash install.sh