Implement comprehensive security for Apache Airflow DAGs using role-based access control, HashiCorp Vault integration, and encrypted secrets management. Configure granular permissions, audit logging, and isolation policies for production workflows.
Prerequisites
- Existing Apache Airflow installation
- PostgreSQL database
- Root or sudo access
- Basic understanding of RBAC concepts
- HashiCorp Vault knowledge helpful
What this solves
Apache Airflow DAGs often handle sensitive data and credentials, requiring proper security controls to prevent unauthorized access and data breaches. This tutorial configures comprehensive DAG security with RBAC policies, encrypted secrets management through HashiCorp Vault, and audit logging to meet enterprise security requirements.
Prerequisites
You need an existing Apache Airflow installation with PostgreSQL backend. If you don't have this setup, follow our Apache Airflow installation guide first.
airflow user with installation directory at /opt/airflow.Step-by-step configuration
Install required dependencies
Install the necessary Python packages for Vault integration and enhanced security features.
sudo -u airflow pip install apache-airflow[password,ldap]
sudo -u airflow pip install hvac cryptography
Configure RBAC authentication
Enable role-based access control in Airflow configuration to manage user permissions granularly.
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
rbac = True
expose_config = False
web_server_ssl_cert = /opt/airflow/ssl/airflow.crt
web_server_ssl_key = /opt/airflow/ssl/airflow.key
base_url = https://example.com:8080
[core]
fernet_key = your_32_character_fernet_key_here
security = kerberos
default_timezone = utc
max_active_runs_per_dag = 3
parallelism = 16
dag_concurrency = 8
Generate Fernet encryption key
Create a secure Fernet key for encrypting connection passwords and sensitive data in the database.
sudo -u airflow python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
sudo sed -i "s/your_32_character_fernet_key_here/$(sudo -u airflow python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")/" /opt/airflow/airflow.cfg
Create SSL certificates
Generate SSL certificates for secure HTTPS communication with the Airflow webserver.
sudo mkdir -p /opt/airflow/ssl
sudo openssl req -x509 -nodes -days 365 -newkey rsa:2048 \
-keyout /opt/airflow/ssl/airflow.key \
-out /opt/airflow/ssl/airflow.crt \
-subj "/C=US/ST=State/L=City/O=Organization/CN=example.com"
sudo chown -R airflow:airflow /opt/airflow/ssl
sudo chmod 600 /opt/airflow/ssl/airflow.key
sudo chmod 644 /opt/airflow/ssl/airflow.crt
Configure HashiCorp Vault integration
Set up Vault integration for secure secrets management. First, install and configure Vault if not already done.
wget -O- https://apt.releases.hashicorp.com/gpg | sudo gpg --dearmor -o /usr/share/keyrings/hashicorp-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/hashicorp-archive-keyring.gpg] https://apt.releases.hashicorp.com $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/hashicorp.list
sudo apt update && sudo apt install -y vault
Configure Vault for Airflow secrets
Configure Vault server and create a secrets backend for Airflow to retrieve sensitive configuration.
ui = true
mlock = true
storage "file" {
path = "/opt/vault/data"
}
listener "tcp" {
address = "127.0.0.1:8200"
tls_disable = 1
}
api_addr = "http://127.0.0.1:8200"
cluster_addr = "https://127.0.0.1:8201"
sudo mkdir -p /opt/vault/data
sudo chown -R vault:vault /opt/vault
sudo systemctl enable --now vault
export VAULT_ADDR='http://127.0.0.1:8200'
vault operator init -key-shares=5 -key-threshold=3
Enable secrets backend in Airflow
Configure Airflow to use Vault as the secrets backend for retrieving database connections and variables.
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"connections_path": "connections", "variables_path": "variables", "config_path": "config", "url": "http://127.0.0.1:8200", "token": "your_vault_token_here", "mount_point": "airflow"}
Create custom RBAC roles
Define custom roles with specific permissions for different user types and DAG access levels.
from airflow.www.security import AirflowSecurityManager
from flask_appbuilder.security.views import AuthDBView
from flask_appbuilder.security.views import expose
from flask_login import login_user, logout_user
class CustomSecurityManager(AirflowSecurityManager):
def __init__(self, appbuilder):
super(CustomSecurityManager, self).__init__(appbuilder)
# Define custom roles
self.create_custom_roles()
def create_custom_roles(self):
# Create DAG Developer role
dag_developer_perms = [
('can_read', 'DAG Runs'),
('can_create', 'DAG Runs'),
('can_edit', 'DAG Runs'),
('can_read', 'Task Instances'),
('can_read', 'Logs'),
('can_read', 'ImportError'),
('can_read', 'DAG'),
('can_read', 'Task Reschedule'),
]
# Create DAG Viewer role
dag_viewer_perms = [
('can_read', 'DAG Runs'),
('can_read', 'Task Instances'),
('can_read', 'Logs'),
('can_read', 'DAG'),
]
# Create Production Operator role
prod_operator_perms = [
('can_read', 'DAG Runs'),
('can_create', 'DAG Runs'),
('can_edit', 'DAG Runs'),
('can_delete', 'DAG Runs'),
('can_read', 'Task Instances'),
('can_edit', 'Task Instances'),
('can_read', 'Logs'),
('can_read', 'DAG'),
('can_edit', 'DAG'),
]
self._create_role_if_not_exists('DAG_Developer', dag_developer_perms)
self._create_role_if_not_exists('DAG_Viewer', dag_viewer_perms)
self._create_role_if_not_exists('Production_Operator', prod_operator_perms)
def _create_role_if_not_exists(self, role_name, permissions):
role = self.find_role(role_name)
if not role:
role = self.add_role(role_name)
for perm_name, view_name in permissions:
pvm = self.find_permission_view_menu(perm_name, view_name)
if pvm:
self.add_permission_role(role, pvm)
Configure DAG-level access control
Implement DAG-level permissions to restrict access to specific workflows based on user roles and tags.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago
DAG with security classifications
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
Production DAG with restricted access
dag_production = DAG(
'production_etl_pipeline',
default_args=default_args,
description='Production ETL pipeline with restricted access',
schedule_interval=timedelta(hours=6),
access_control={
'Production_Operator': {'can_read', 'can_edit', 'can_create'},
'Admin': {'can_read', 'can_edit', 'can_delete', 'can_create'},
},
tags=['production', 'sensitive', 'finance'],
catchup=False,
max_active_runs=1,
is_paused_upon_creation=True,
)
Development DAG with broader access
dag_development = DAG(
'development_test_pipeline',
default_args=default_args,
description='Development pipeline for testing',
schedule_interval=None,
access_control={
'DAG_Developer': {'can_read', 'can_edit', 'can_create'},
'DAG_Viewer': {'can_read'},
'Production_Operator': {'can_read', 'can_edit'},
'Admin': {'can_read', 'can_edit', 'can_delete', 'can_create'},
},
tags=['development', 'testing'],
catchup=False,
)
def get_sensitive_data():
# Retrieve encrypted connection from Vault
db_password = Variable.get("database_password", deserialize_json=False)
api_key = Variable.get("external_api_key", deserialize_json=False)
return f"Retrieved secure credentials: {len(db_password)} chars"
Production tasks
secure_task = PythonOperator(
task_id='fetch_secure_data',
python_callable=get_sensitive_data,
dag=dag_production,
pool='sensitive_pool',
)
data_processing = BashOperator(
task_id='process_sensitive_data',
bash_command='echo "Processing data with security controls"',
dag=dag_production,
pool='sensitive_pool',
)
secure_task >> data_processing
Development tasks
test_task = BashOperator(
task_id='test_basic_functionality',
bash_command='echo "Running development tests"',
dag=dag_development,
)
Configure resource pools for isolation
Create resource pools to isolate sensitive DAGs and limit concurrent execution of critical workflows.
sudo -u airflow airflow pools set sensitive_pool 2 "Pool for sensitive production DAGs"
sudo -u airflow airflow pools set development_pool 5 "Pool for development and testing DAGs"
sudo -u airflow airflow pools set default_pool 16 "Default pool for general DAGs"
Enable comprehensive audit logging
Configure detailed audit logging to track all user actions, DAG executions, and security events.
[logging]
logging_level = INFO
fab_logging_level = WARN
logging_config_class = airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG
remote_logging = True
remote_log_conn_id = aws_s3_logs
remote_base_log_folder = s3://airflow-logs/
encrypt_s3_logs = True
[webserver]
audit_view_excluded_events = login,logout
audit_view_include_ids = True
[core]
sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost:5432/airflow
load_examples = False
dag_dir_list_interval = 300
dag_discovery_safe_mode = True
default_task_retries = 1
killed_task_cleanup_time = 60
Configure database connection encryption
Ensure database connections use SSL encryption and store credentials securely in Vault.
# Store database connection in Vault
vault kv put airflow/connections/postgres_default \
conn_type=postgres \
host=localhost \
schema=airflow \
login=airflow \
password=secure_password_here \
port=5432 \
extra='{"sslmode": "require", "sslcert": "/opt/airflow/ssl/client.crt", "sslkey": "/opt/airflow/ssl/client.key", "sslrootcert": "/opt/airflow/ssl/ca.crt"}'
Store API keys and sensitive variables
vault kv put airflow/variables/database_password value=secure_db_password
vault kv put airflow/variables/external_api_key value=api_key_12345
Create user accounts with RBAC
Set up user accounts with appropriate role assignments for different access levels.
# Create admin user
sudo -u airflow airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password secure_admin_password
Create production operator
sudo -u airflow airflow users create \
--username prodops \
--firstname Production \
--lastname Operator \
--role Production_Operator \
--email prodops@example.com \
--password secure_prod_password
Create DAG developer
sudo -u airflow airflow users create \
--username devuser \
--firstname DAG \
--lastname Developer \
--role DAG_Developer \
--email devuser@example.com \
--password secure_dev_password
Create read-only viewer
sudo -u airflow airflow users create \
--username viewer \
--firstname DAG \
--lastname Viewer \
--role DAG_Viewer \
--email viewer@example.com \
--password secure_viewer_password
Configure firewall rules
Set up firewall rules to restrict access to Airflow services and allow only necessary connections.
sudo ufw allow 8080/tcp comment "Airflow Webserver HTTPS"
sudo ufw allow from 10.0.0.0/8 to any port 8793 comment "Airflow Worker"
sudo ufw allow from 127.0.0.1 to any port 8200 comment "Vault Local"
sudo ufw reload
Start secured Airflow services
Restart Airflow services with the new security configuration and verify all components are running correctly.
sudo systemctl restart airflow-webserver
sudo systemctl restart airflow-scheduler
sudo systemctl restart airflow-worker
sudo systemctl status airflow-webserver airflow-scheduler
Verify your setup
Test the security configuration by verifying RBAC permissions, SSL encryption, and secrets management.
# Check Airflow is running with SSL
curl -k https://localhost:8080/health
Verify Vault integration
sudo -u airflow airflow connections list
sudo -u airflow airflow variables list
Test RBAC by logging in with different user roles
Navigate to https://localhost:8080 in your browser
Verify audit logging
sudo tail -f /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
sudo tail -f /var/log/syslog | grep airflow
Configure monitoring and alerting
Set up monitoring for security events and failed authentication attempts. This integrates with existing monitoring solutions like Prometheus for comprehensive observability.
import logging
from airflow.models import DagRun, TaskInstance
from airflow.utils.email import send_email
from datetime import datetime, timedelta
Configure security event logging
security_logger = logging.getLogger('airflow.security')
handler = logging.FileHandler('/opt/airflow/logs/security.log')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
security_logger.addHandler(handler)
security_logger.setLevel(logging.INFO)
def log_security_event(event_type, user, details):
"""Log security events for audit trail"""
security_logger.info(f"SECURITY_EVENT: {event_type} | USER: {user} | DETAILS: {details}")
def check_failed_logins():
"""Monitor for excessive failed login attempts"""
# Implementation depends on your authentication backend
# This is a basic example for database-backed authentication
pass
def monitor_dag_modifications():
"""Alert on unauthorized DAG modifications"""
# Check for DAG file modifications outside of approved processes
pass
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| RBAC permissions not working | Custom security manager not loaded | Ensure AIRFLOW__WEBSERVER__RBAC=True and restart webserver |
| Vault connection failed | Token expired or wrong mount point | Check vault token validity: vault auth -method=userpass |
| SSL certificate errors | Self-signed certificate not trusted | Add certificate to system trust store or use proper CA-signed cert |
| DAG access denied | User role doesn't have required permissions | Check role assignment: airflow users list |
| Fernet key encryption errors | Key changed after connections created | Regenerate connections or use same key across environments |
| Pool slots exhausted | Too many concurrent tasks in sensitive pool | Increase pool size: airflow pools set sensitive_pool 4 |
Next steps
- Configure LDAP authentication for enterprise users
- Set up comprehensive DAG monitoring and alerting
- Configure Airflow high availability clustering
- Optimize Airflow performance for production workloads
- Advanced Vault integration with Kubernetes
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Default configuration
AIRFLOW_HOME="${AIRFLOW_HOME:-/opt/airflow}"
AIRFLOW_USER="airflow"
DOMAIN="${1:-localhost}"
VAULT_VERSION="1.15.4"
# Usage message
usage() {
echo "Usage: $0 [domain] [airflow_home]"
echo "Example: $0 airflow.example.com /opt/airflow"
exit 1
}
# Logging functions
log_info() { echo -e "${GREEN}[INFO]${NC} $1"; }
log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; }
log_error() { echo -e "${RED}[ERROR]${NC} $1"; }
# Cleanup function for rollback
cleanup() {
log_error "Installation failed! Rolling back..."
systemctl stop vault 2>/dev/null || true
systemctl stop airflow-webserver 2>/dev/null || true
systemctl stop airflow-scheduler 2>/dev/null || true
rm -rf /opt/vault 2>/dev/null || true
rm -rf ${AIRFLOW_HOME}/ssl 2>/dev/null || true
}
trap cleanup ERR
# Check prerequisites
check_prerequisites() {
if [[ $EUID -ne 0 ]]; then
log_error "This script must be run as root"
exit 1
fi
if ! id "$AIRFLOW_USER" &>/dev/null; then
log_error "Airflow user '$AIRFLOW_USER' does not exist. Please install Airflow first."
exit 1
fi
if [[ ! -d "$AIRFLOW_HOME" ]]; then
log_error "Airflow directory '$AIRFLOW_HOME' does not exist"
exit 1
fi
}
# Detect distribution and set package manager
detect_distro() {
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update"
FIREWALL_CMD="ufw"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
FIREWALL_CMD="firewall-cmd"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
FIREWALL_CMD="firewall-cmd"
;;
*)
log_error "Unsupported distribution: $ID"
exit 1
;;
esac
else
log_error "Cannot detect distribution"
exit 1
fi
log_info "Detected distribution: $ID, using $PKG_MGR"
}
# Install system dependencies
install_dependencies() {
echo "[1/8] Installing system dependencies..."
$PKG_UPDATE
case "$PKG_MGR" in
apt)
$PKG_INSTALL python3-pip openssl wget gnupg lsb-release
;;
dnf|yum)
$PKG_INSTALL python3-pip openssl wget gnupg2 yum-utils
;;
esac
}
# Install Python packages
install_python_packages() {
echo "[2/8] Installing Python packages..."
sudo -u $AIRFLOW_USER pip install apache-airflow[password,ldap] hvac cryptography
}
# Generate Fernet key and update Airflow config
configure_airflow_security() {
echo "[3/8] Configuring Airflow security settings..."
# Generate Fernet key
FERNET_KEY=$(sudo -u $AIRFLOW_USER python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")
# Backup original config
cp ${AIRFLOW_HOME}/airflow.cfg ${AIRFLOW_HOME}/airflow.cfg.backup
# Update airflow.cfg with security settings
cat > /tmp/airflow_security.conf << EOF
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
rbac = True
expose_config = False
web_server_ssl_cert = ${AIRFLOW_HOME}/ssl/airflow.crt
web_server_ssl_key = ${AIRFLOW_HOME}/ssl/airflow.key
base_url = https://${DOMAIN}:8080
[core]
fernet_key = ${FERNET_KEY}
security = kerberos
default_timezone = utc
max_active_runs_per_dag = 3
parallelism = 16
dag_concurrency = 8
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"connections_path": "connections", "variables_path": "variables", "mount_point": "airflow", "url": "http://127.0.0.1:8200"}
EOF
# Append to airflow config
cat /tmp/airflow_security.conf >> ${AIRFLOW_HOME}/airflow.cfg
chown $AIRFLOW_USER:$AIRFLOW_USER ${AIRFLOW_HOME}/airflow.cfg
chmod 600 ${AIRFLOW_HOME}/airflow.cfg
rm /tmp/airflow_security.conf
}
# Create SSL certificates
create_ssl_certificates() {
echo "[4/8] Creating SSL certificates..."
mkdir -p ${AIRFLOW_HOME}/ssl
openssl req -x509 -nodes -days 365 -newkey rsa:2048 \
-keyout ${AIRFLOW_HOME}/ssl/airflow.key \
-out ${AIRFLOW_HOME}/ssl/airflow.crt \
-subj "/C=US/ST=State/L=City/O=Organization/CN=${DOMAIN}"
chown -R $AIRFLOW_USER:$AIRFLOW_USER ${AIRFLOW_HOME}/ssl
chmod 700 ${AIRFLOW_HOME}/ssl
chmod 600 ${AIRFLOW_HOME}/ssl/airflow.key
chmod 644 ${AIRFLOW_HOME}/ssl/airflow.crt
}
# Install HashiCorp Vault
install_vault() {
echo "[5/8] Installing HashiCorp Vault..."
case "$PKG_MGR" in
apt)
wget -O- https://apt.releases.hashicorp.com/gpg | gpg --dearmor -o /usr/share/keyrings/hashicorp-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/hashicorp-archive-keyring.gpg] https://apt.releases.hashicorp.com $(lsb_release -cs) main" > /etc/apt/sources.list.d/hashicorp.list
apt update
$PKG_INSTALL vault
;;
dnf|yum)
$PKG_INSTALL yum-utils
yum-config-manager --add-repo https://rpm.releases.hashicorp.com/RHEL/hashicorp.repo
$PKG_INSTALL vault
;;
esac
# Create vault user if not exists
if ! id vault &>/dev/null; then
useradd --system --home /opt/vault --shell /bin/false vault
fi
}
# Configure Vault
configure_vault() {
echo "[6/8] Configuring HashiCorp Vault..."
mkdir -p /opt/vault/data
mkdir -p /etc/vault.d
cat > /etc/vault.d/vault.conf << EOF
ui = true
mlock = true
storage "file" {
path = "/opt/vault/data"
}
listener "tcp" {
address = "127.0.0.1:8200"
tls_disable = 1
}
api_addr = "http://127.0.0.1:8200"
cluster_addr = "https://127.0.0.1:8201"
EOF
chown -R vault:vault /opt/vault
chown vault:vault /etc/vault.d/vault.conf
chmod 640 /etc/vault.d/vault.conf
# Create systemd service
cat > /etc/systemd/system/vault.service << EOF
[Unit]
Description=HashiCorp Vault
Documentation=https://www.vaultproject.io/docs/
Requires=network-online.target
After=network-online.target
ConditionFileNotEmpty=/etc/vault.d/vault.conf
[Service]
Type=notify
User=vault
Group=vault
ProtectSystem=full
ProtectHome=read-only
PrivateTmp=yes
PrivateDevices=yes
SecureBits=keep-caps
AmbientCapabilities=CAP_IPC_LOCK
NoNewPrivileges=yes
ExecStart=/usr/bin/vault server -config=/etc/vault.d/vault.conf
ExecReload=/bin/kill -HUP \$MAINPID
KillMode=process
Restart=on-failure
RestartSec=5
TimeoutStopSec=30
StartLimitBurst=3
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable vault
systemctl start vault
}
# Configure firewall
configure_firewall() {
echo "[7/8] Configuring firewall..."
case "$FIREWALL_CMD" in
ufw)
if command -v ufw &>/dev/null; then
ufw allow 8080/tcp comment "Airflow HTTPS"
ufw allow 8200/tcp comment "Vault API"
fi
;;
firewall-cmd)
if command -v firewall-cmd &>/dev/null; then
firewall-cmd --permanent --add-port=8080/tcp
firewall-cmd --permanent --add-port=8200/tcp
firewall-cmd --reload
fi
;;
esac
}
# Verify installation
verify_installation() {
echo "[8/8] Verifying installation..."
# Check Vault status
if systemctl is-active --quiet vault; then
log_info "✓ Vault is running"
else
log_error "✗ Vault is not running"
return 1
fi
# Check SSL certificates
if [[ -f "${AIRFLOW_HOME}/ssl/airflow.crt" && -f "${AIRFLOW_HOME}/ssl/airflow.key" ]]; then
log_info "✓ SSL certificates created"
else
log_error "✗ SSL certificates missing"
return 1
fi
# Check Fernet key in config
if grep -q "fernet_key" ${AIRFLOW_HOME}/airflow.cfg; then
log_info "✓ Fernet key configured"
else
log_error "✗ Fernet key not found in configuration"
return 1
fi
log_info "Airflow security configuration completed successfully!"
log_warn "Remember to:"
log_warn "1. Initialize Vault: export VAULT_ADDR='http://127.0.0.1:8200' && vault operator init"
log_warn "2. Restart Airflow services to apply new configuration"
log_warn "3. Access Airflow at https://${DOMAIN}:8080"
}
# Main execution
main() {
log_info "Starting Airflow security configuration..."
check_prerequisites
detect_distro
install_dependencies
install_python_packages
configure_airflow_security
create_ssl_certificates
install_vault
configure_vault
configure_firewall
verify_installation
log_info "Installation completed successfully!"
}
# Handle arguments
if [[ $# -gt 2 ]]; then
usage
fi
if [[ $# -ge 2 ]]; then
AIRFLOW_HOME="$2"
fi
main "$@"
Review the script before running. Execute with: bash install.sh