Set up Apache Airflow with Celery executor, Redis cluster backend, HAProxy load balancing, and auto-scaling workers for production-grade workflow orchestration with high availability.
Prerequisites
- 3+ servers with 4GB RAM each
- PostgreSQL database server
- Basic Linux administration knowledge
- Understanding of distributed systems
What this solves
Apache Airflow with CeleryExecutor provides horizontal scaling for workflow processing, but requires proper load balancing and high availability configuration for production use. This tutorial implements a complete high availability Airflow setup with Redis cluster for Celery backend, HAProxy for web interface load balancing, multiple worker nodes, and auto-scaling capabilities to handle varying workloads efficiently.
Step-by-step configuration
Install Redis cluster for Celery backend
Redis cluster provides high availability and automatic failover for Celery message broker. Install Redis on three nodes for cluster setup.
sudo apt update
sudo apt install -y redis-server redis-tools
sudo systemctl enable redis-server
Configure Redis cluster nodes
Configure each Redis node with cluster-enabled settings. Replace the IP addresses with your actual node IPs.
bind 0.0.0.0
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 15000
cluster-announce-ip 203.0.113.10
cluster-announce-port 7000
appendonly yes
appendfilename "appendonly-7000.aof"
maxmemory 2gb
maxmemory-policy allkeys-lru
Start Redis on all cluster nodes:
sudo systemctl restart redis-server
sudo systemctl status redis-server
Create Redis cluster
Initialize the Redis cluster with three master nodes. Run this command from one node:
redis-cli --cluster create 203.0.113.10:7000 203.0.113.11:7000 203.0.113.12:7000 --cluster-replicas 0
Verify cluster status:
redis-cli -h 203.0.113.10 -p 7000 cluster info
redis-cli -h 203.0.113.10 -p 7000 cluster nodes
Install Apache Airflow with CeleryExecutor
Install Airflow with Celery executor and Redis provider on all nodes that will run Airflow components.
sudo apt install -y python3-pip python3-venv postgresql-client
python3 -m venv /opt/airflow
source /opt/airflow/bin/activate
pip install apache-airflow[celery,redis,postgres]==2.8.0
pip install redis==4.5.5 celery==5.3.4
Configure Airflow for CeleryExecutor
Configure Airflow to use CeleryExecutor with Redis cluster as broker and result backend.
[core]
executor = CeleryExecutor
sql_alchemy_conn = postgresql://airflow:password@203.0.113.20:5432/airflow
dags_folder = /opt/airflow/dags
base_log_folder = /opt/airflow/logs
remote_logging = False
load_examples = False
max_active_runs_per_dag = 16
max_active_tasks_per_dag = 16
[webserver]
web_server_host = 0.0.0.0
web_server_port = 8080
worker_refresh_batch_size = 1
worker_refresh_interval = 6000
secret_key = your-secret-key-here
[celery]
broker_url = redis+cluster://203.0.113.10:7000,203.0.113.11:7000,203.0.113.12:7000/0
result_backend = redis+cluster://203.0.113.10:7000,203.0.113.11:7000,203.0.113.12:7000/0
worker_concurrency = 8
worker_log_server_port = 8793
celery_app_name = airflow.executors.celery_executor
worker_precheck = True
[celery_kubernetes_executor]
kubernetes_queue = kubernetes
Initialize Airflow database
Initialize the Airflow metadata database and create an admin user.
export AIRFLOW_HOME=/opt/airflow
source /opt/airflow/bin/activate
airflow db init
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password secure_password
Create systemd services for Airflow components
Create systemd service files for Airflow webserver, scheduler, and workers.
[Unit]
Description=Airflow webserver
After=network.target
[Service]
Environment=AIRFLOW_HOME=/opt/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/opt/airflow/bin/airflow webserver
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
[Unit]
Description=Airflow scheduler
After=network.target
[Service]
Environment=AIRFLOW_HOME=/opt/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/opt/airflow/bin/airflow scheduler
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
[Unit]
Description=Airflow celery worker
After=network.target
[Service]
Environment=AIRFLOW_HOME=/opt/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/opt/airflow/bin/airflow celery worker
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
Create airflow user and set permissions
Create dedicated airflow user and set proper ownership for Airflow directories.
sudo useradd -r -s /bin/false airflow
sudo mkdir -p /opt/airflow/logs /opt/airflow/dags /opt/airflow/plugins
sudo chown -R airflow:airflow /opt/airflow
sudo chmod 755 /opt/airflow
sudo chmod 775 /opt/airflow/logs
Install and configure HAProxy for load balancing
Install HAProxy to load balance traffic across multiple Airflow webserver instances.
sudo apt install -y haproxy
Configure HAProxy for Airflow webserver load balancing:
global
log stdout len 65536 local0
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin
stats timeout 30s
user haproxy
group haproxy
daemon
maxconn 4096
defaults
mode http
log global
option httplog
option dontlognull
option log-health-checks
option forwardfor
option http-server-close
timeout connect 5000
timeout client 50000
timeout server 50000
errorfile 400 /etc/haproxy/errors/400.http
errorfile 403 /etc/haproxy/errors/403.http
errorfile 408 /etc/haproxy/errors/408.http
errorfile 500 /etc/haproxy/errors/500.http
errorfile 502 /etc/haproxy/errors/502.http
errorfile 503 /etc/haproxy/errors/503.http
errorfile 504 /etc/haproxy/errors/504.http
frontend airflow_frontend
bind *:80
bind *:443 ssl crt /etc/ssl/certs/airflow.pem
redirect scheme https if !{ ssl_fc }
default_backend airflow_webservers
backend airflow_webservers
balance roundrobin
option httpchk GET /health
http-check expect status 200
server airflow1 203.0.113.30:8080 check inter 5s fall 3 rise 2
server airflow2 203.0.113.31:8080 check inter 5s fall 3 rise 2
server airflow3 203.0.113.32:8080 check inter 5s fall 3 rise 2
listen stats
bind *:8404
stats enable
stats uri /stats
stats refresh 30s
stats admin if TRUE
Start Airflow services on webserver nodes
Enable and start Airflow webserver and scheduler services on designated nodes.
sudo systemctl daemon-reload
sudo systemctl enable --now airflow-webserver airflow-scheduler
sudo systemctl status airflow-webserver airflow-scheduler
Start Airflow worker services on worker nodes
Start Celery workers on dedicated worker nodes for task execution.
sudo systemctl enable --now airflow-worker
sudo systemctl status airflow-worker
Configure worker auto-scaling with systemd templates
Create systemd template for dynamic worker scaling based on queue load.
[Unit]
Description=Airflow celery worker %i
After=network.target
[Service]
Environment=AIRFLOW_HOME=/opt/airflow
Environment=CELERYD_NODE_NAME=worker%i
User=airflow
Group=airflow
Type=simple
ExecStart=/opt/airflow/bin/airflow celery worker --hostname worker%i
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
Create scaling script for automatic worker management:
#!/usr/bin/env python3
import subprocess
import redis
import time
import sys
import os
def get_queue_length():
try:
r = redis.Redis.from_url('redis+cluster://203.0.113.10:7000,203.0.113.11:7000,203.0.113.12:7000/0')
return r.llen('celery')
except Exception as e:
print(f"Error connecting to Redis: {e}")
return 0
def get_active_workers():
try:
result = subprocess.run(['systemctl', 'list-units', '--state=active', '--plain', 'airflow-worker@*'],
capture_output=True, text=True)
return len([line for line in result.stdout.split('\n') if 'airflow-worker@' in line])
except Exception:
return 0
def scale_workers(target_workers):
current_workers = get_active_workers()
if target_workers > current_workers:
for i in range(current_workers + 1, target_workers + 1):
subprocess.run(['systemctl', 'start', f'airflow-worker@{i}'])
print(f"Started worker {i}")
elif target_workers < current_workers:
for i in range(target_workers + 1, current_workers + 1):
subprocess.run(['systemctl', 'stop', f'airflow-worker@{i}'])
print(f"Stopped worker {i}")
if __name__ == '__main__':
queue_length = get_queue_length()
current_workers = get_active_workers()
# Scale workers based on queue length
if queue_length > 50:
target_workers = min(8, current_workers + 2)
elif queue_length > 20:
target_workers = min(6, current_workers + 1)
elif queue_length > 5:
target_workers = max(2, current_workers)
else:
target_workers = max(1, current_workers - 1)
print(f"Queue length: {queue_length}, Current workers: {current_workers}, Target workers: {target_workers}")
if target_workers != current_workers:
scale_workers(target_workers)
Make the script executable and create a systemd timer:
sudo chmod +x /opt/airflow/bin/scale-workers.py
sudo chown airflow:airflow /opt/airflow/bin/scale-workers.py
[Unit]
Description=Airflow worker autoscaling
[Service]
Type=oneshot
User=airflow
ExecStart=/usr/bin/python3 /opt/airflow/bin/scale-workers.py
[Unit]
Description=Run airflow autoscaling every 2 minutes
Requires=airflow-autoscale.service
[Timer]
OnCalendar=::0/120
Persistent=true
[Install]
WantedBy=timers.target
Enable HAProxy and autoscaling
Start HAProxy load balancer and enable worker autoscaling timer.
sudo systemctl enable --now haproxy
sudo systemctl enable --now airflow-autoscale.timer
sudo systemctl list-timers airflow-autoscale.timer
Configure monitoring and health checks
Set up health check endpoints and monitoring for the Airflow cluster.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'health_check',
default_args=default_args,
description='System health check DAG',
schedule_interval=timedelta(minutes=10),
catchup=False,
tags=['monitoring', 'health'],
)
def check_redis_cluster():
import redis
try:
r = redis.Redis.from_url('redis+cluster://203.0.113.10:7000/0')
r.ping()
return "Redis cluster healthy"
except Exception as e:
raise Exception(f"Redis cluster unhealthy: {e}")
redis_check = PythonOperator(
task_id='check_redis_cluster',
python_callable=check_redis_cluster,
dag=dag,
)
worker_check = BashOperator(
task_id='check_celery_workers',
bash_command='celery -A airflow.executors.celery_executor inspect active',
dag=dag,
)
redis_check >> worker_check
Verify your setup
Test the high availability Airflow deployment with load balancing and auto-scaling.
# Check Redis cluster status
redis-cli -h 203.0.113.10 -p 7000 cluster info
Check Airflow services
sudo systemctl status airflow-webserver airflow-scheduler airflow-worker
Check HAProxy status
sudo systemctl status haproxy
curl -I http://203.0.113.1/health
Check Celery workers
source /opt/airflow/bin/activate
export AIRFLOW_HOME=/opt/airflow
airflow celery inspect active
airflow celery inspect stats
Test autoscaling
sudo systemctl status airflow-autoscale.timer
sudo journalctl -u airflow-autoscale.service -f
Check HAProxy stats
curl http://203.0.113.1:8404/stats
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Workers not connecting to Redis | Incorrect cluster configuration | Verify Redis cluster nodes and connectivity with redis-cli cluster info |
| HAProxy returns 503 errors | Airflow webservers down | Check webserver status with systemctl status airflow-webserver |
| Tasks stuck in queued state | No available workers | Check worker status with airflow celery inspect active and scale manually |
| Autoscaling not working | Timer or script issues | Check timer status and logs with systemctl status airflow-autoscale.timer |
| Database connection errors | PostgreSQL connectivity | Verify database connection in airflow.cfg and test with airflow db check |
| Permission denied errors | Incorrect file ownership | Fix with sudo chown -R airflow:airflow /opt/airflow |
Next steps
- Configure Apache Airflow DAG security and secrets management with RBAC policies
- Configure Apache Airflow monitoring with Prometheus alerts and Grafana dashboards
- Implement comprehensive Apache Airflow DAG testing and validation strategies
- Configure Apache Airflow with SSL certificates and NGINX reverse proxy
- Set up Airflow backup and disaster recovery with automated database snapshots
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Script configuration
AIRFLOW_HOME="/opt/airflow"
AIRFLOW_USER="airflow"
REDIS_PORT="7000"
REDIS_CLUSTER_NODES="${REDIS_CLUSTER_NODES:-}"
DATABASE_URL="${DATABASE_URL:-}"
ADMIN_PASSWORD="${ADMIN_PASSWORD:-airflow_admin_$(openssl rand -hex 8)}"
usage() {
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " -n, --nodes Redis cluster node IPs (comma-separated, required)"
echo " -d, --database PostgreSQL connection URL (required)"
echo " -p, --password Admin password (optional, generated if not provided)"
echo " -h, --help Show this help message"
echo ""
echo "Example:"
echo " $0 -n '192.168.1.10,192.168.1.11,192.168.1.12' -d 'postgresql://airflow:pass@192.168.1.20:5432/airflow'"
exit 1
}
cleanup() {
echo -e "${RED}[ERROR] Installation failed. Cleaning up...${NC}"
systemctl stop airflow-webserver airflow-scheduler airflow-worker redis-server 2>/dev/null || true
systemctl disable airflow-webserver airflow-scheduler airflow-worker redis-server 2>/dev/null || true
userdel -r "$AIRFLOW_USER" 2>/dev/null || true
rm -rf "$AIRFLOW_HOME" /etc/systemd/system/airflow-*.service 2>/dev/null || true
}
trap cleanup ERR
log_info() {
echo -e "${GREEN}$1${NC}"
}
log_warn() {
echo -e "${YELLOW}$1${NC}"
}
log_error() {
echo -e "${RED}$1${NC}"
}
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
-n|--nodes)
REDIS_CLUSTER_NODES="$2"
shift 2
;;
-d|--database)
DATABASE_URL="$2"
shift 2
;;
-p|--password)
ADMIN_PASSWORD="$2"
shift 2
;;
-h|--help)
usage
;;
*)
echo "Unknown option: $1"
usage
;;
esac
done
# Validate required arguments
if [[ -z "$REDIS_CLUSTER_NODES" || -z "$DATABASE_URL" ]]; then
log_error "Redis cluster nodes and database URL are required"
usage
fi
# Check if running as root
if [[ $EUID -ne 0 ]]; then
log_error "This script must be run as root"
exit 1
fi
# Detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update"
PKG_INSTALL="apt install -y"
REDIS_CONFIG="/etc/redis/redis.conf"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
REDIS_CONFIG="/etc/redis.conf"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
REDIS_CONFIG="/etc/redis.conf"
;;
*)
log_error "Unsupported distro: $ID"
exit 1
;;
esac
else
log_error "Cannot detect distribution"
exit 1
fi
log_info "[1/8] Installing system packages..."
$PKG_UPDATE
if [[ "$PKG_MGR" == "apt" ]]; then
$PKG_INSTALL redis-server redis-tools python3-pip python3-venv postgresql-client build-essential python3-dev
else
$PKG_INSTALL redis redis-tools python3-pip postgresql gcc python3-devel
fi
log_info "[2/8] Creating airflow user..."
useradd -r -m -s /bin/bash -d "$AIRFLOW_HOME" "$AIRFLOW_USER" || true
mkdir -p "$AIRFLOW_HOME"/{dags,logs,plugins}
chown -R "$AIRFLOW_USER:$AIRFLOW_USER" "$AIRFLOW_HOME"
chmod 755 "$AIRFLOW_HOME"
chmod 755 "$AIRFLOW_HOME"/{dags,logs,plugins}
log_info "[3/8] Configuring Redis cluster..."
# Parse Redis nodes and create cluster config
IFS=',' read -ra NODES <<< "$REDIS_CLUSTER_NODES"
FIRST_NODE="${NODES[0]}"
# Configure Redis
cat > "$REDIS_CONFIG" << EOF
bind 0.0.0.0
port $REDIS_PORT
cluster-enabled yes
cluster-config-file nodes-${REDIS_PORT}.conf
cluster-node-timeout 15000
cluster-announce-ip $(hostname -I | awk '{print $1}')
cluster-announce-port $REDIS_PORT
appendonly yes
appendfilename "appendonly-${REDIS_PORT}.aof"
maxmemory 2gb
maxmemory-policy allkeys-lru
protected-mode no
EOF
systemctl enable redis-server || systemctl enable redis
systemctl start redis-server || systemctl start redis
log_info "[4/8] Installing Apache Airflow..."
sudo -u "$AIRFLOW_USER" bash << EOF
python3 -m venv "$AIRFLOW_HOME/venv"
source "$AIRFLOW_HOME/venv/bin/activate"
pip install --upgrade pip
pip install apache-airflow[celery,redis,postgres]==2.8.0
pip install redis==4.5.5 celery==5.3.4
EOF
log_info "[5/8] Configuring Airflow..."
# Create broker and result backend URLs
REDIS_URLS=""
for node in "${NODES[@]}"; do
if [[ -z "$REDIS_URLS" ]]; then
REDIS_URLS="redis://${node}:${REDIS_PORT}/0"
else
REDIS_URLS="${REDIS_URLS},redis://${node}:${REDIS_PORT}/0"
fi
done
sudo -u "$AIRFLOW_USER" bash << EOF
export AIRFLOW_HOME="$AIRFLOW_HOME"
cat > "$AIRFLOW_HOME/airflow.cfg" << EOL
[core]
executor = CeleryExecutor
sql_alchemy_conn = $DATABASE_URL
dags_folder = $AIRFLOW_HOME/dags
base_log_folder = $AIRFLOW_HOME/logs
remote_logging = False
load_examples = False
max_active_runs_per_dag = 16
max_active_tasks_per_dag = 16
[webserver]
web_server_host = 0.0.0.0
web_server_port = 8080
worker_refresh_batch_size = 1
worker_refresh_interval = 6000
secret_key = $(openssl rand -hex 32)
[celery]
broker_url = redis+cluster://$REDIS_URLS
result_backend = redis+cluster://$REDIS_URLS
worker_concurrency = 8
worker_log_server_port = 8793
celery_app_name = airflow.executors.celery_executor
worker_precheck = True
[celery_kubernetes_executor]
kubernetes_queue = kubernetes
EOL
chmod 644 "$AIRFLOW_HOME/airflow.cfg"
EOF
log_info "[6/8] Initializing Airflow database..."
sudo -u "$AIRFLOW_USER" bash << EOF
export AIRFLOW_HOME="$AIRFLOW_HOME"
source "$AIRFLOW_HOME/venv/bin/activate"
airflow db init
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password "$ADMIN_PASSWORD"
EOF
log_info "[7/8] Creating systemd services..."
# Webserver service
cat > /etc/systemd/system/airflow-webserver.service << EOF
[Unit]
Description=Airflow webserver
After=network.target
[Service]
Environment=AIRFLOW_HOME=$AIRFLOW_HOME
User=$AIRFLOW_USER
Group=$AIRFLOW_USER
Type=simple
ExecStart=$AIRFLOW_HOME/venv/bin/airflow webserver
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
EOF
# Scheduler service
cat > /etc/systemd/system/airflow-scheduler.service << EOF
[Unit]
Description=Airflow scheduler
After=network.target
[Service]
Environment=AIRFLOW_HOME=$AIRFLOW_HOME
User=$AIRFLOW_USER
Group=$AIRFLOW_USER
Type=simple
ExecStart=$AIRFLOW_HOME/venv/bin/airflow scheduler
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
EOF
# Worker service
cat > /etc/systemd/system/airflow-worker.service << EOF
[Unit]
Description=Airflow celery worker
After=network.target
[Service]
Environment=AIRFLOW_HOME=$AIRFLOW_HOME
User=$AIRFLOW_USER
Group=$AIRFLOW_USER
Type=simple
ExecStart=$AIRFLOW_HOME/venv/bin/airflow celery worker
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
EOF
chmod 644 /etc/systemd/system/airflow-*.service
systemctl daemon-reload
log_info "[8/8] Starting Airflow services..."
systemctl enable airflow-webserver airflow-scheduler airflow-worker
systemctl start airflow-webserver airflow-scheduler airflow-worker
# Verify services
sleep 10
if systemctl is-active --quiet airflow-webserver && systemctl is-active --quiet airflow-scheduler && systemctl is-active --quiet airflow-worker; then
log_info "✅ Airflow installation completed successfully!"
log_info "Web UI: http://$(hostname -I | awk '{print $1}'):8080"
log_info "Username: admin"
log_info "Password: $ADMIN_PASSWORD"
else
log_error "❌ Some services failed to start. Check logs with: journalctl -u airflow-*"
exit 1
fi
Review the script before running. Execute with: bash install.sh