Configure comprehensive Apache Airflow monitoring using DataDog agent to track DAG performance, task execution metrics, and resource utilization with custom dashboards and automated alerting for production workflow management.
Prerequisites
- Apache Airflow installed and running
- DataDog account with API key
- Python 3.8+ with pip
- Root or sudo access
What this solves
Apache Airflow generates extensive metrics about DAG execution, task performance, and system resource usage, but these metrics aren't automatically collected or visualized. DataDog provides comprehensive monitoring for Airflow deployments, tracking everything from task success rates to scheduler performance. This integration helps you identify bottlenecks, monitor SLA compliance, and maintain healthy workflow orchestration in production environments.
Step-by-step configuration
Install DataDog agent
Download and install the DataDog agent using the official installation script. Replace YOUR_API_KEY with your actual DataDog API key from the DataDog console.
DD_API_KEY=YOUR_API_KEY bash -c "$(curl -L https://s3.amazonaws.com/dd-agent/scripts/install_script.sh)"
sudo systemctl enable datadog-agent
sudo systemctl start datadog-agent
Configure Airflow metrics collection
Enable Airflow's StatsD metrics by configuring the airflow.cfg file. This allows Airflow to send metrics to the DataDog agent's StatsD server.
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
statsd_allow_list = scheduler,executor,dagrun,taskinstance
Create DataDog Airflow integration configuration
Configure the DataDog agent to collect Airflow metrics by creating the integration configuration file.
init_config:
instances:
- url: http://localhost:8080
username: admin
password: admin
tags:
- environment:production
- airflow_cluster:main
collect_health_metrics: true
collect_task_metrics: true
collect_dag_metrics: true
Configure DataDog agent for StatsD
Enable and configure the DataDog agent's DogStatsD server to receive metrics from Airflow.
# DogStatsD configuration
dogstatsd_config:
enabled: true
bind_host: localhost
port: 8125
non_local_traffic: false
Tags for all metrics
tags:
- datacenter:us-east-1
- environment:production
- service:airflow
Create custom Airflow metrics script
Create a custom script to collect additional Airflow metrics that aren't available through the standard integration.
#!/usr/bin/env python3
import time
from datetime import datetime, timedelta
from airflow import settings
from airflow.models import DagRun, TaskInstance, DagModel
from datadog import initialize, statsd
Initialize DataDog
options = {
'statsd_host': 'localhost',
'statsd_port': 8125,
}
initialize(**options)
def collect_airflow_metrics():
session = settings.Session()
try:
# Count active DAGs
active_dags = session.query(DagModel).filter(DagModel.is_active == True).count()
statsd.gauge('airflow.dags.active', active_dags)
# Count running DAG runs
running_dag_runs = session.query(DagRun).filter(DagRun.state == 'running').count()
statsd.gauge('airflow.dag_runs.running', running_dag_runs)
# Count failed tasks in last hour
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
failed_tasks = session.query(TaskInstance).filter(
TaskInstance.state == 'failed',
TaskInstance.end_date >= one_hour_ago
).count()
statsd.gauge('airflow.tasks.failed_last_hour', failed_tasks)
# Average task duration by DAG
for dag in session.query(DagModel).filter(DagModel.is_active == True):
avg_duration = session.query(TaskInstance).filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.state == 'success',
TaskInstance.end_date >= one_hour_ago
).with_entities(func.avg(TaskInstance.duration)).scalar()
if avg_duration:
statsd.gauge(f'airflow.dag.{dag.dag_id}.avg_task_duration', float(avg_duration))
finally:
session.close()
if __name__ == '__main__':
collect_airflow_metrics()
Set up automated metrics collection
Create a systemd timer to run the custom metrics collection script every minute.
[Unit]
Description=Airflow Custom Metrics Collection
After=network.target
[Service]
Type=oneshot
User=airflow
Group=airflow
ExecStart=/usr/bin/python3 /opt/airflow/scripts/custom_metrics.py
Environment=AIRFLOW_HOME=/opt/airflow
WorkingDirectory=/opt/airflow
[Unit]
Description=Run Airflow Custom Metrics Collection
Requires=airflow-metrics.service
[Timer]
OnBootSec=1min
OnUnitActiveSec=1min
Unit=airflow-metrics.service
[Install]
WantedBy=timers.target
Enable custom metrics collection
Enable and start the systemd timer for automated metrics collection.
sudo systemctl daemon-reload
sudo systemctl enable airflow-metrics.timer
sudo systemctl start airflow-metrics.timer
sudo systemctl status airflow-metrics.timer
Configure log collection
Configure DataDog to collect Airflow logs for centralized log analysis and alerting.
logs:
- type: file
path: "/opt/airflow/logs/scheduler/*.log"
service: airflow-scheduler
source: airflow
log_processing_rules:
- type: multi_line
name: airflow_scheduler
pattern: '\d{4}-\d{2}-\d{2}'
- type: file
path: "/opt/airflow/logs/dag_processor_manager/*.log"
service: airflow-dag-processor
source: airflow
- type: file
path: "/opt/airflow/logs////.log"
service: airflow-tasks
source: airflow
tags:
- log_type:task_execution
Restart DataDog agent
Restart the DataDog agent to apply all configuration changes and begin collecting metrics.
sudo systemctl restart datadog-agent
sudo systemctl status datadog-agent
sudo datadog-agent status
Restart Airflow services
Restart Airflow components to enable StatsD metrics collection.
sudo systemctl restart airflow-scheduler
sudo systemctl restart airflow-webserver
sudo systemctl restart airflow-worker
Create custom DataDog dashboards
Import Airflow dashboard template
Use the DataDog API or web interface to create a comprehensive Airflow monitoring dashboard. Save this JSON configuration for dashboard creation.
{
"title": "Apache Airflow Performance Monitor",
"description": "Comprehensive Airflow monitoring dashboard",
"widgets": [
{
"definition": {
"type": "timeseries",
"requests": [
{
"q": "avg:airflow.dag_runs.running{*}",
"display_type": "line",
"style": {
"palette": "dog_classic",
"line_type": "solid",
"line_width": "normal"
}
}
],
"title": "Running DAG Runs",
"show_legend": false
},
"layout": {
"x": 0,
"y": 0,
"width": 4,
"height": 2
}
},
{
"definition": {
"type": "query_value",
"requests": [
{
"q": "avg:airflow.tasks.failed_last_hour{*}",
"aggregator": "last"
}
],
"title": "Failed Tasks (Last Hour)",
"autoscale": true,
"precision": 0
},
"layout": {
"x": 4,
"y": 0,
"width": 2,
"height": 2
}
}
]
}
Configure alerting and notifications
Create Airflow performance alerts
Set up DataDog monitors to alert on critical Airflow performance issues and failures.
curl -X POST "https://api.datadoghq.com/api/v1/monitor" \
-H "Content-Type: application/json" \
-H "DD-API-KEY: YOUR_API_KEY" \
-H "DD-APPLICATION-KEY: YOUR_APP_KEY" \
-d '{
"type": "metric alert",
"query": "avg(last_5m):avg:airflow.tasks.failed_last_hour{*} > 10",
"name": "High Airflow Task Failure Rate",
"message": "@slack-alerts Airflow is experiencing high task failure rates. Current: {{value}} failed tasks in the last hour.",
"tags": ["service:airflow", "alert_type:performance"],
"options": {
"thresholds": {
"critical": 10,
"warning": 5
},
"notify_audit": false,
"require_full_window": true,
"new_host_delay": 300,
"include_tags": true,
"escalation_message": "@pagerduty-airflow Airflow task failures continue to exceed threshold."
}
}'
Configure scheduler health monitoring
Create alerts to monitor Airflow scheduler health and responsiveness.
{
"type": "service check",
"query": "\"airflow.scheduler.heartbeat\".over(\"*\").last(2).count_by_status()",
"name": "Airflow Scheduler Health Check",
"message": "@slack-critical The Airflow scheduler appears to be down or unresponsive. Please check the scheduler service immediately.",
"tags": ["service:airflow", "component:scheduler"],
"options": {
"thresholds": {
"ok": 1,
"critical": 1
},
"no_data_timeframe": 10,
"notify_no_data": true
}
}
Set up SLA violation alerts
Configure monitoring for DAG SLA violations to ensure workflow compliance.
{
"type": "log alert",
"query": "logs(\"service:airflow source:airflow\").index(\"*\").rollup(\"count\").by(\"dag_id\").last(\"15m\") > 0",
"name": "Airflow SLA Violations",
"message": "@team-data-engineering SLA violation detected for DAG: {{dag_id.name}}. Review task performance and resource allocation.",
"tags": ["service:airflow", "alert_type:sla"],
"options": {
"enable_logs_sample": true,
"escalation_message": "@manager-data SLA violations continue for DAG: {{dag_id.name}}"
}
}
Verify your setup
# Check DataDog agent status
sudo datadog-agent status
Verify Airflow metrics are being sent
sudo datadog-agent check airflow
Check custom metrics collection
sudo systemctl status airflow-metrics.timer
sudo journalctl -u airflow-metrics.service -n 20
Verify StatsD metrics
echo "airflow.test.metric:1|c" | nc -u localhost 8125
Check Airflow configuration
airflow config get-value metrics statsd_on
Visit your DataDog dashboard to confirm metrics are flowing and alerts are configured. You should see Airflow metrics under the "Metrics Explorer" and can create custom dashboards using the collected data.
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| No metrics in DataDog | StatsD not enabled in Airflow | Check statsd_on = True in airflow.cfg and restart services |
| Permission denied on log files | DataDog agent can't read logs | sudo chown -R dd-agent:airflow /opt/airflow/logs |
| Custom metrics script fails | Missing Python dependencies | pip install datadog apache-airflow |
| High metric ingestion costs | Too many custom metrics | Filter metrics using statsd_allow_list in airflow.cfg |
| Dashboard shows no data | Incorrect metric names | Use DataDog Metrics Explorer to verify metric names |
Next steps
- Configure Apache Airflow high availability with CeleryExecutor for production scale
- Implement Grafana advanced alerting with webhooks for additional monitoring layers
- Configure Apache Airflow DAG security and secrets management to secure your workflows
- Set up Airflow distributed monitoring with Prometheus for multi-cluster deployments
- Implement Airflow performance optimization with connection pooling to reduce bottlenecks
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Apache Airflow DataDog Monitoring Setup Script
# Configures DataDog agent integration with Apache Airflow for comprehensive monitoring
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Default values
DD_API_KEY="${1:-}"
AIRFLOW_USER="${2:-airflow}"
AIRFLOW_HOME="${3:-/opt/airflow}"
AIRFLOW_WEB_USER="${4:-admin}"
AIRFLOW_WEB_PASS="${5:-admin}"
usage() {
echo "Usage: $0 <DD_API_KEY> [airflow_user] [airflow_home] [web_user] [web_pass]"
echo " DD_API_KEY: DataDog API key (required)"
echo " airflow_user: Airflow system user (default: airflow)"
echo " airflow_home: Airflow installation directory (default: /opt/airflow)"
echo " web_user: Airflow web UI username (default: admin)"
echo " web_pass: Airflow web UI password (default: admin)"
exit 1
}
log() {
echo -e "${GREEN}[INFO]${NC} $1"
}
warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
error() {
echo -e "${RED}[ERROR]${NC} $1" >&2
}
cleanup() {
if [ $? -ne 0 ]; then
error "Installation failed. Cleaning up..."
systemctl stop datadog-agent 2>/dev/null || true
rm -f /etc/datadog-agent/conf.d/airflow.d/conf.yaml
rm -f /opt/datadog-agent/embedded/bin/airflow_metrics.py
fi
}
trap cleanup ERR
# Validate arguments
if [ -z "$DD_API_KEY" ]; then
usage
fi
# Check if running as root
if [ "$EUID" -ne 0 ]; then
error "This script must be run as root"
exit 1
fi
# Detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update"
;;
almalinux|rocky|centos|rhel|ol)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf check-update || true"
;;
fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf check-update || true"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum check-update || true"
;;
*)
error "Unsupported distribution: $ID"
exit 1
;;
esac
else
error "Cannot detect distribution - /etc/os-release not found"
exit 1
fi
log "Detected distribution: $ID using $PKG_MGR"
echo "[1/8] Updating package repositories..."
$PKG_UPDATE
echo "[2/8] Installing prerequisites..."
$PKG_INSTALL curl wget python3 python3-pip
echo "[3/8] Installing DataDog agent..."
DD_API_KEY="$DD_API_KEY" bash -c "$(curl -L https://s3.amazonaws.com/dd-agent/scripts/install_script.sh)"
systemctl enable datadog-agent
systemctl start datadog-agent
echo "[4/8] Configuring DataDog agent for StatsD..."
cat > /etc/datadog-agent/datadog.yaml.tmp << EOF
api_key: $DD_API_KEY
site: datadoghq.com
# DogStatsD configuration
dogstatsd_config:
enabled: true
bind_host: localhost
port: 8125
non_local_traffic: false
# Tags for all metrics
tags:
- datacenter:$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone 2>/dev/null || echo "unknown")
- environment:production
- service:airflow
logs_enabled: true
process_config:
enabled: true
EOF
# Preserve existing API key and site if datadog.yaml exists
if [ -f /etc/datadog-agent/datadog.yaml ]; then
cp /etc/datadog-agent/datadog.yaml /etc/datadog-agent/datadog.yaml.bak
fi
mv /etc/datadog-agent/datadog.yaml.tmp /etc/datadog-agent/datadog.yaml
chown dd-agent:dd-agent /etc/datadog-agent/datadog.yaml
chmod 640 /etc/datadog-agent/datadog.yaml
echo "[5/8] Creating Airflow integration configuration..."
mkdir -p /etc/datadog-agent/conf.d/airflow.d
cat > /etc/datadog-agent/conf.d/airflow.d/conf.yaml << EOF
init_config:
instances:
- url: http://localhost:8080
username: $AIRFLOW_WEB_USER
password: $AIRFLOW_WEB_PASS
tags:
- environment:production
- airflow_cluster:main
collect_health_metrics: true
collect_task_metrics: true
collect_dag_metrics: true
EOF
chown dd-agent:dd-agent /etc/datadog-agent/conf.d/airflow.d/conf.yaml
chmod 644 /etc/datadog-agent/conf.d/airflow.d/conf.yaml
echo "[6/8] Configuring Airflow metrics collection..."
if [ -f "$AIRFLOW_HOME/airflow.cfg" ]; then
# Backup existing config
cp "$AIRFLOW_HOME/airflow.cfg" "$AIRFLOW_HOME/airflow.cfg.bak.$(date +%Y%m%d_%H%M%S)"
# Update metrics configuration
if grep -q "\[metrics\]" "$AIRFLOW_HOME/airflow.cfg"; then
# Update existing metrics section
sed -i '/^\[metrics\]/,/^\[/ {
s/^statsd_on.*/statsd_on = True/
s/^statsd_host.*/statsd_host = localhost/
s/^statsd_port.*/statsd_port = 8125/
s/^statsd_prefix.*/statsd_prefix = airflow/
s/^statsd_allow_list.*/statsd_allow_list = scheduler,executor,dagrun,taskinstance/
}' "$AIRFLOW_HOME/airflow.cfg"
else
# Add metrics section
cat >> "$AIRFLOW_HOME/airflow.cfg" << EOF
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
statsd_allow_list = scheduler,executor,dagrun,taskinstance
EOF
fi
chown $AIRFLOW_USER:$AIRFLOW_USER "$AIRFLOW_HOME/airflow.cfg"
chmod 644 "$AIRFLOW_HOME/airflow.cfg"
log "Updated Airflow configuration at $AIRFLOW_HOME/airflow.cfg"
else
warn "Airflow configuration file not found at $AIRFLOW_HOME/airflow.cfg"
warn "Please manually add metrics configuration to airflow.cfg"
fi
echo "[7/8] Creating custom metrics collection script..."
mkdir -p /opt/datadog-agent/embedded/bin
cat > /opt/datadog-agent/embedded/bin/airflow_metrics.py << 'EOF'
#!/usr/bin/env python3
import time
import sys
import os
from datetime import datetime, timedelta
try:
from airflow import settings
from airflow.models import DagRun, TaskInstance, DagModel
from sqlalchemy import func
except ImportError:
print("Airflow not found. Please ensure Airflow is installed and accessible.")
sys.exit(1)
try:
import socket
except ImportError:
print("Socket module not available")
sys.exit(1)
def send_metric(metric_name, value, metric_type='gauge', host='localhost', port=8125):
"""Send metric to StatsD"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
if metric_type == 'gauge':
message = f"{metric_name}:{value}|g"
elif metric_type == 'count':
message = f"{metric_name}:{value}|c"
sock.sendto(message.encode(), (host, port))
finally:
sock.close()
def collect_airflow_metrics():
try:
session = settings.Session()
# Count active DAGs
active_dags = session.query(DagModel).filter(DagModel.is_active == True).count()
send_metric('airflow.dags.active', active_dags)
# Count running DAG runs
running_dag_runs = session.query(DagRun).filter(DagRun.state == 'running').count()
send_metric('airflow.dag_runs.running', running_dag_runs)
# Count failed tasks in last hour
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
failed_tasks = session.query(TaskInstance).filter(
TaskInstance.state == 'failed',
TaskInstance.end_date >= one_hour_ago
).count()
send_metric('airflow.tasks.failed_last_hour', failed_tasks)
print("Custom metrics collected successfully")
except Exception as e:
print(f"Error collecting metrics: {e}")
finally:
session.close()
if __name__ == "__main__":
collect_airflow_metrics()
EOF
chmod 755 /opt/datadog-agent/embedded/bin/airflow_metrics.py
chown dd-agent:dd-agent /opt/datadog-agent/embedded/bin/airflow_metrics.py
# Create systemd service for custom metrics
cat > /etc/systemd/system/airflow-datadog-metrics.service << EOF
[Unit]
Description=Airflow DataDog Custom Metrics Collection
After=datadog-agent.service
[Service]
Type=oneshot
User=dd-agent
Environment=AIRFLOW_HOME=$AIRFLOW_HOME
ExecStart=/opt/datadog-agent/embedded/bin/airflow_metrics.py
EOF
# Create timer for regular execution
cat > /etc/systemd/system/airflow-datadog-metrics.timer << EOF
[Unit]
Description=Run Airflow DataDog metrics collection every 5 minutes
Requires=airflow-datadog-metrics.service
[Timer]
OnCalendar=*:0/5
Persistent=true
[Install]
WantedBy=timers.target
EOF
systemctl daemon-reload
systemctl enable airflow-datadog-metrics.timer
systemctl start airflow-datadog-metrics.timer
echo "[8/8] Restarting DataDog agent and verification..."
systemctl restart datadog-agent
# Wait for services to start
sleep 5
# Verify installation
echo "Verifying installation..."
if systemctl is-active --quiet datadog-agent; then
log "✓ DataDog agent is running"
else
error "✗ DataDog agent is not running"
exit 1
fi
if systemctl is-active --quiet airflow-datadog-metrics.timer; then
log "✓ Custom metrics timer is active"
else
warn "✗ Custom metrics timer is not active"
fi
if [ -f /etc/datadog-agent/conf.d/airflow.d/conf.yaml ]; then
log "✓ Airflow integration configured"
else
error "✗ Airflow integration configuration missing"
fi
log "Installation completed successfully!"
log "DataDog agent is now monitoring your Airflow instance."
log "Check your DataDog dashboard for incoming metrics."
log "Custom metrics will be collected every 5 minutes."
Review the script before running. Execute with: bash install.sh