Configure comprehensive security for Apache Airflow with role-based access control, DAG-level permissions, data lineage tracking, and audit logging for compliance monitoring in production environments.
Prerequisites
- Apache Airflow 2.0+ installed
- PostgreSQL database configured
- Python 3.8+ with pip
- Root or sudo access
What this solves
Apache Airflow's default configuration provides minimal security controls, leaving DAGs and data accessible to all users. This tutorial implements enterprise-grade security with role-based access control (RBAC), DAG-level permissions, data governance through lineage tracking, and comprehensive audit logging. You'll secure workflow access, track data transformations, and maintain compliance audit trails for production environments.
Step-by-step configuration
Enable RBAC authentication
Configure Airflow to use RBAC with database-backed authentication instead of the default authentication.
[webserver]
rbac = True
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
[api]
auth_backend = airflow.api.auth.backend.basic_auth
Initialize RBAC database tables
Create the necessary database tables for RBAC user management and permissions.
cd /opt/airflow
source venv/bin/activate
airflow db init
airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com
Configure custom roles and permissions
Create a Python script to define custom roles with specific DAG and task permissions.
from airflow import settings
from airflow.models import DagBag
from airflow.www.app import cached_app
from flask_appbuilder.security.sqla.models import Role, Permission, ViewMenu
from flask_appbuilder.security.sqla.manager import SecurityManager
def create_custom_roles():
app = cached_app()
with app.app_context():
security_manager = app.appbuilder.sm
# Create Data Engineer role
data_engineer_role = security_manager.find_role('DataEngineer')
if not data_engineer_role:
data_engineer_role = security_manager.add_role('DataEngineer')
# Create Data Analyst role
data_analyst_role = security_manager.find_role('DataAnalyst')
if not data_analyst_role:
data_analyst_role = security_manager.add_role('DataAnalyst')
# Add permissions to Data Engineer
perms_de = [
'can_read on DAGs',
'can_edit on DAGs',
'can_read on Task Instances',
'can_create on Task Instances',
'can_read on Log',
'menu_access on Browse'
]
# Add permissions to Data Analyst
perms_da = [
'can_read on DAGs',
'can_read on Task Instances',
'can_read on Log',
'menu_access on Browse'
]
for perm_str in perms_de:
perm = security_manager.find_permission_view_menu(*perm_str.split(' on '))
if perm:
security_manager.add_permission_role(data_engineer_role, perm)
for perm_str in perms_da:
perm = security_manager.find_permission_view_menu(*perm_str.split(' on '))
if perm:
security_manager.add_permission_role(data_analyst_role, perm)
if __name__ == '__main__':
create_custom_roles()
Implement DAG-level access control
Configure DAG-level permissions to restrict access based on user roles and DAG tags.
[webserver]
filter_by_owner = True
access_control_allow_origins = *
[core]
store_dag_code = True
store_serialized_dags = True
Create DAG access control decorator
Implement a decorator to control DAG access based on user permissions and tags.
from functools import wraps
from airflow.models import Variable
from airflow.exceptions import AirflowException
from airflow.utils.db import provide_session
from airflow.models.dagbag import DagBag
from flask_login import current_user
def dag_access_control(allowed_roles=None, required_tags=None):
"""
Decorator to control DAG access based on user roles and tags
"""
def decorator(dag_function):
@wraps(dag_function)
def wrapper(*args, **kwargs):
if allowed_roles and hasattr(current_user, 'roles'):
user_roles = [role.name for role in current_user.roles]
if not any(role in allowed_roles for role in user_roles):
raise AirflowException(f"Access denied. Required roles: {allowed_roles}")
dag = dag_function(*args, **kwargs)
# Add access control tags
if required_tags:
existing_tags = set(dag.tags or [])
dag.tags = list(existing_tags.union(set(required_tags)))
return dag
return wrapper
return decorator
class DagPermissionChecker:
@staticmethod
@provide_session
def can_access_dag(dag_id, user, session=None):
"""Check if user can access specific DAG"""
dag_bag = DagBag()
dag = dag_bag.get_dag(dag_id)
if not dag:
return False
# Check owner-based access
if hasattr(dag, 'owner') and dag.owner == user.username:
return True
# Check role-based access
if hasattr(user, 'roles'):
user_roles = [role.name for role in user.roles]
if 'Admin' in user_roles:
return True
# Check tag-based permissions
if dag.tags:
allowed_tags = Variable.get(f"dag_access_{user.username}", default_var="", deserialize_json=True)
if any(tag in allowed_tags for tag in dag.tags):
return True
return False
Enable audit logging
Configure comprehensive audit logging to track all user actions and system events.
[logging]
logging_level = INFO
fab_logging_level = INFO
log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
logging_config_class = airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
Create audit logging plugin
Implement a custom plugin to capture detailed audit events for compliance tracking.
import json
import logging
from datetime import datetime
from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.hooks.base_hook import BaseHook
from flask import request, g
from flask_login import current_user
class AuditLogger:
def __init__(self):
self.logger = logging.getLogger('airflow.audit')
handler = logging.FileHandler('/opt/airflow/logs/audit.log')
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_event(self, event_type, details):
audit_event = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'user': getattr(current_user, 'username', 'anonymous'),
'ip_address': request.remote_addr if request else 'system',
'details': details
}
self.logger.info(json.dumps(audit_event))
class AuditHook(BaseHook):
def __init__(self):
super().__init__()
self.audit_logger = AuditLogger()
def log_dag_run(self, dag_id, run_id, state):
self.audit_logger.log_event('dag_run', {
'dag_id': dag_id,
'run_id': run_id,
'state': state
})
def log_task_instance(self, dag_id, task_id, execution_date, state):
self.audit_logger.log_event('task_instance', {
'dag_id': dag_id,
'task_id': task_id,
'execution_date': execution_date.isoformat(),
'state': state
})
def log_connection_access(self, conn_id):
self.audit_logger.log_event('connection_access', {
'connection_id': conn_id
})
def log_variable_access(self, key, action):
self.audit_logger.log_event('variable_access', {
'variable_key': key,
'action': action
})
class AuditOperator(BaseOperator):
@apply_defaults
def __init__(self, event_type, details=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.event_type = event_type
self.details = details or {}
self.audit_hook = AuditHook()
def execute(self, context):
self.audit_hook.audit_logger.log_event(self.event_type, {
**self.details,
'dag_id': context['dag'].dag_id,
'task_id': context['task'].task_id,
'execution_date': context['execution_date'].isoformat()
})
class AuditPlugin(AirflowPlugin):
name = 'audit_plugin'
operators = [AuditOperator]
hooks = [AuditHook]
Configure data lineage tracking
Set up data lineage tracking to monitor data flow and transformations across DAGs.
import json
from typing import List, Dict, Any
from airflow.models import BaseOperator, XCom
from airflow.utils.decorators import apply_defaults
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
class DataLineageTracker:
def __init__(self):
self.postgres_hook = PostgresHook(postgres_conn_id='lineage_db')
self._ensure_lineage_tables()
def _ensure_lineage_tables(self):
create_tables_sql = """
CREATE TABLE IF NOT EXISTS data_lineage (
id SERIAL PRIMARY KEY,
dag_id VARCHAR(250) NOT NULL,
task_id VARCHAR(250) NOT NULL,
execution_date TIMESTAMP NOT NULL,
source_datasets TEXT[],
target_datasets TEXT[],
transformation_type VARCHAR(100),
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS dataset_registry (
id SERIAL PRIMARY KEY,
dataset_name VARCHAR(500) UNIQUE NOT NULL,
dataset_type VARCHAR(100),
schema_info JSONB,
owner VARCHAR(250),
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_lineage_dag_task
ON data_lineage(dag_id, task_id);
CREATE INDEX IF NOT EXISTS idx_lineage_execution_date
ON data_lineage(execution_date);
"""
self.postgres_hook.run(create_tables_sql)
def track_lineage(self, dag_id: str, task_id: str, execution_date: datetime,
source_datasets: List[str], target_datasets: List[str],
transformation_type: str, metadata: Dict[str, Any] = None):
insert_sql = """
INSERT INTO data_lineage
(dag_id, task_id, execution_date, source_datasets, target_datasets,
transformation_type, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
self.postgres_hook.run(insert_sql, parameters=(
dag_id, task_id, execution_date,
source_datasets, target_datasets,
transformation_type, json.dumps(metadata or {})
))
def register_dataset(self, dataset_name: str, dataset_type: str,
schema_info: Dict[str, Any], owner: str, description: str = None):
insert_sql = """
INSERT INTO dataset_registry
(dataset_name, dataset_type, schema_info, owner, description)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (dataset_name)
DO UPDATE SET
schema_info = EXCLUDED.schema_info,
updated_at = CURRENT_TIMESTAMP
"""
self.postgres_hook.run(insert_sql, parameters=(
dataset_name, dataset_type, json.dumps(schema_info), owner, description
))
def get_lineage_graph(self, dataset_name: str, depth: int = 3):
"""Get upstream and downstream lineage for a dataset"""
query_sql = """
WITH RECURSIVE lineage_tree AS (
-- Base case: direct lineage
SELECT dag_id, task_id, source_datasets, target_datasets, 0 as level
FROM data_lineage
WHERE %s = ANY(source_datasets) OR %s = ANY(target_datasets)
UNION ALL
-- Recursive case: expand lineage
SELECT dl.dag_id, dl.task_id, dl.source_datasets, dl.target_datasets, lt.level + 1
FROM data_lineage dl
JOIN lineage_tree lt ON (
array_overlap(dl.target_datasets, lt.source_datasets) OR
array_overlap(dl.source_datasets, lt.target_datasets)
)
WHERE lt.level < %s
)
SELECT DISTINCT * FROM lineage_tree ORDER BY level;
"""
result = self.postgres_hook.get_records(query_sql, parameters=(
dataset_name, dataset_name, depth
))
return result
class LineageOperator(BaseOperator):
@apply_defaults
def __init__(self, source_datasets: List[str], target_datasets: List[str],
transformation_type: str, metadata: Dict[str, Any] = None,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.source_datasets = source_datasets
self.target_datasets = target_datasets
self.transformation_type = transformation_type
self.metadata = metadata or {}
self.lineage_tracker = DataLineageTracker()
def execute(self, context):
self.lineage_tracker.track_lineage(
context['dag'].dag_id,
context['task'].task_id,
context['execution_date'],
self.source_datasets,
self.target_datasets,
self.transformation_type,
self.metadata
)
Create secure DAG template
Implement a template DAG that demonstrates security policies, audit logging, and lineage tracking.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from utils.security import dag_access_control
from utils.lineage import LineageOperator, DataLineageTracker
from plugins.audit_plugin import AuditOperator, AuditHook
Security configuration
default_args = {
'owner': 'data-engineering-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
Apply access control decorator
@dag_access_control(
allowed_roles=['Admin', 'DataEngineer'],
required_tags=['etl', 'production', 'sensitive']
)
def create_secure_etl_dag():
dag = DAG(
'secure_etl_example',
default_args=default_args,
description='Secure ETL DAG with audit logging and lineage tracking',
schedule_interval='@daily',
catchup=False,
tags=['etl', 'production', 'sensitive'],
max_active_runs=1,
doc_md="""
# Secure ETL Pipeline
This DAG demonstrates:
- Role-based access control
- Data lineage tracking
- Audit logging
- Secure data processing
Access: DataEngineer role required
Data Classification: Sensitive
"""
)
# Audit task start
audit_start = AuditOperator(
task_id='audit_pipeline_start',
event_type='pipeline_start',
details={'classification': 'sensitive'},
dag=dag
)
# Data extraction with lineage tracking
extract_data = LineageOperator(
task_id='extract_customer_data',
source_datasets=['production.customers', 'production.orders'],
target_datasets=['staging.customer_extract'],
transformation_type='extract',
metadata={
'extraction_method': 'incremental',
'filter_conditions': 'created_date > CURRENT_DATE - 1',
'row_count_estimate': 10000
},
dag=dag
)
def secure_transform(**context):
"""Secure data transformation with audit logging"""
audit_hook = AuditHook()
lineage_tracker = DataLineageTracker()
try:
# Log data access
audit_hook.log_connection_access('production_db')
# Simulate secure data transformation
# In real implementation, add encryption, masking, etc.
transformation_metadata = {
'records_processed': 9500,
'pii_fields_masked': ['email', 'phone'],
'encryption_applied': True
}
# Track transformation lineage
lineage_tracker.track_lineage(
context['dag'].dag_id,
context['task'].task_id,
context['execution_date'],
['staging.customer_extract'],
['warehouse.customer_clean'],
'transform',
transformation_metadata
)
# Log successful completion
audit_hook.audit_logger.log_event('data_transformation', {
'status': 'success',
**transformation_metadata
})
except Exception as e:
audit_hook.audit_logger.log_event('data_transformation', {
'status': 'failed',
'error': str(e)
})
raise
transform_data = PythonOperator(
task_id='transform_customer_data',
python_callable=secure_transform,
dag=dag
)
# Data loading with lineage
load_data = LineageOperator(
task_id='load_customer_data',
source_datasets=['warehouse.customer_clean'],
target_datasets=['analytics.customer_mart'],
transformation_type='load',
metadata={
'load_method': 'upsert',
'target_table': 'customer_mart',
'business_keys': ['customer_id']
},
dag=dag
)
# Data quality check
def data_quality_check(**context):
"""Perform data quality checks with audit logging"""
audit_hook = AuditHook()
# Simulate quality checks
quality_results = {
'null_check_passed': True,
'duplicate_check_passed': True,
'range_check_passed': True,
'total_records': 9500,
'quality_score': 0.98
}
audit_hook.audit_logger.log_event('data_quality_check', quality_results)
if quality_results['quality_score'] < 0.95:
raise Exception(f"Data quality score too low: {quality_results['quality_score']}")
quality_check = PythonOperator(
task_id='data_quality_check',
python_callable=data_quality_check,
dag=dag
)
# Audit task completion
audit_complete = AuditOperator(
task_id='audit_pipeline_complete',
event_type='pipeline_complete',
details={'status': 'success'},
dag=dag
)
# Define task dependencies
audit_start >> extract_data >> transform_data >> load_data >> quality_check >> audit_complete
return dag
Create the DAG
secure_etl_dag = create_secure_etl_dag()
Set up audit log monitoring
Configure log rotation and monitoring for audit logs to maintain compliance and performance.
/opt/airflow/logs/audit.log {
daily
rotate 365
compress
delaycompress
missingok
notifempty
create 644 airflow airflow
postrotate
/bin/kill -HUP $(cat /var/run/airflow/webserver.pid 2>/dev/null) 2>/dev/null || true
endscript
}
Create compliance monitoring dashboard
Set up monitoring queries to track security compliance and audit events.
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable
class ComplianceMonitor:
def __init__(self):
self.postgres_hook = PostgresHook(postgres_conn_id='lineage_db')
def get_access_violations(self, days: int = 7) -> List[Dict]:
"""Check for unauthorized access attempts"""
# This would parse audit logs for failed access attempts
# Implementation depends on your audit log format
pass
def get_data_processing_report(self, days: int = 30) -> Dict[str, Any]:
"""Generate data processing compliance report"""
query = """
SELECT
COUNT(DISTINCT dag_id) as active_dags,
COUNT(*) as total_lineage_events,
COUNT(DISTINCT unnest(source_datasets)) as source_datasets,
COUNT(DISTINCT unnest(target_datasets)) as target_datasets,
transformation_type,
COUNT(*) as transformation_count
FROM data_lineage
WHERE execution_date >= %s
GROUP BY transformation_type
ORDER BY transformation_count DESC
"""
since_date = datetime.now() - timedelta(days=days)
results = self.postgres_hook.get_records(query, parameters=(since_date,))
return {
'period_days': days,
'report_generated': datetime.now().isoformat(),
'transformations': results
}
def check_sensitive_data_access(self) -> Dict[str, Any]:
"""Monitor access to sensitive datasets"""
sensitive_patterns = Variable.get('sensitive_dataset_patterns',
default_var=['pii', 'financial', 'medical'],
deserialize_json=True)
query = """
SELECT dag_id, task_id, source_datasets, target_datasets, execution_date
FROM data_lineage
WHERE execution_date >= %s
AND (array_to_string(source_datasets, ',') ~ %s
OR array_to_string(target_datasets, ',') ~ %s)
ORDER BY execution_date DESC
LIMIT 100
"""
pattern = '|'.join(sensitive_patterns)
since_date = datetime.now() - timedelta(days=1)
results = self.postgres_hook.get_records(query, parameters=(
since_date, pattern, pattern
))
return {
'sensitive_data_access_events': len(results),
'events': results[:10], # Return first 10 for dashboard
'patterns_monitored': sensitive_patterns
}
def generate_compliance_summary(self) -> Dict[str, Any]:
"""Generate overall compliance status"""
return {
'timestamp': datetime.now().isoformat(),
'data_processing': self.get_data_processing_report(30),
'sensitive_access': self.check_sensitive_data_access(),
'compliance_score': self.calculate_compliance_score()
}
def calculate_compliance_score(self) -> float:
"""Calculate overall compliance score"""
# Implement scoring logic based on:
# - Audit log completeness
# - Access control violations
# - Data lineage coverage
# - Policy adherence
return 0.95 # Placeholder
Restart Airflow services
Restart all Airflow components to apply the security configuration changes.
sudo systemctl stop airflow-webserver
sudo systemctl stop airflow-scheduler
sudo systemctl stop airflow-worker
Clear any cached configurations
rm -rf /opt/airflow/airflow-webserver.pid
rm -rf /tmp/airflowdb_*
Restart services
sudo systemctl start airflow-webserver
sudo systemctl start airflow-scheduler
sudo systemctl start airflow-worker
Verify services are running
sudo systemctl status airflow-webserver
sudo systemctl status airflow-scheduler
Verify your setup
Test the security configuration and verify all components are working correctly.
# Check RBAC is enabled
curl -u admin:your_password http://localhost:8080/api/v1/config
Verify audit logging
tail -f /opt/airflow/logs/audit.log
Test DAG access control
cd /opt/airflow
source venv/bin/activate
python -c "from dags.utils.security import DagPermissionChecker; print('Security module loaded successfully')"
Check lineage database tables
psql -h localhost -U airflow -d lineage_db -c "\dt"
Test lineage tracking
python -c "from dags.utils.lineage import DataLineageTracker; tracker = DataLineageTracker(); print('Lineage tracker initialized')"
Verify plugin loading
airflow plugins | grep audit_plugin
sudo chown -R airflow:airflow /opt/airflow/logsConfigure user management
Set up additional users and assign appropriate roles for your team structure.
# Create users with specific roles
airflow users create \
--username data_engineer_1 \
--firstname John \
--lastname Doe \
--role DataEngineer \
--email john.doe@example.com
airflow users create \
--username data_analyst_1 \
--firstname Jane \
--lastname Smith \
--role DataAnalyst \
--email jane.smith@example.com
List all users and their roles
airflow users list
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| RBAC interface not loading | Database not initialized | airflow db upgrade and restart webserver |
| Audit logs not created | Permissions or plugin not loaded | Check /opt/airflow/logs permissions and verify plugin in airflow plugins |
| Lineage tracking fails | Database connection missing | Create connection: airflow connections add lineage_db --conn-type postgres --conn-host localhost |
| DAG access denied errors | User not assigned to role | Check user roles with airflow users list and assign with web UI |
| Custom permissions not working | Security manager cache | Restart webserver and clear /tmp/airflow* cache files |
Next steps
- Configure Apache Airflow LDAP authentication and RBAC with Active Directory integration
- Configure Apache Airflow monitoring with Prometheus alerts and Grafana dashboards
- Implement comprehensive Apache Airflow DAG testing and validation strategies with pytest and best practices
- Set up Apache Airflow data quality monitoring with Great Expectations integration
- Configure Apache Airflow secrets management with HashiCorp Vault integration
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
# Configuration
AIRFLOW_USER="airflow"
AIRFLOW_HOME="/opt/airflow"
ADMIN_EMAIL="${1:-admin@example.com}"
ADMIN_PASSWORD="${2:-admin123}"
usage() {
echo "Usage: $0 [admin_email] [admin_password]"
echo "Example: $0 admin@company.com securepassword"
exit 1
}
cleanup() {
echo -e "${RED}Installation failed. Cleaning up...${NC}"
systemctl stop airflow-webserver airflow-scheduler 2>/dev/null || true
userdel -r "$AIRFLOW_USER" 2>/dev/null || true
}
trap cleanup ERR
# Check if running as root
if [[ $EUID -ne 0 ]]; then
echo -e "${RED}This script must be run as root${NC}"
exit 1
fi
echo -e "${GREEN}Starting Apache Airflow RBAC security setup...${NC}"
# Auto-detect distribution
echo -e "${YELLOW}[1/10] Detecting distribution...${NC}"
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update"
PYTHON_PKG="python3 python3-pip python3-venv"
SERVICE_CMD="systemctl"
;;
almalinux|rocky|centos|rhel|ol)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
PYTHON_PKG="python3 python3-pip python3-virtualenv"
SERVICE_CMD="systemctl"
;;
fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
PYTHON_PKG="python3 python3-pip python3-virtualenv"
SERVICE_CMD="systemctl"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
PYTHON_PKG="python3 python3-pip"
SERVICE_CMD="systemctl"
;;
*)
echo -e "${RED}Unsupported distribution: $ID${NC}"
exit 1
;;
esac
echo -e "${GREEN}Detected: $PRETTY_NAME${NC}"
else
echo -e "${RED}Cannot detect distribution${NC}"
exit 1
fi
# Update package manager
echo -e "${YELLOW}[2/10] Updating package manager...${NC}"
$PKG_UPDATE
# Install dependencies
echo -e "${YELLOW}[3/10] Installing dependencies...${NC}"
$PKG_INSTALL $PYTHON_PKG postgresql postgresql-client gcc python3-dev libpq-dev
# Create airflow user
echo -e "${YELLOW}[4/10] Creating airflow user...${NC}"
if ! id "$AIRFLOW_USER" &>/dev/null; then
useradd -r -m -d "$AIRFLOW_HOME" -s /bin/bash "$AIRFLOW_USER"
fi
# Setup Python virtual environment
echo -e "${YELLOW}[5/10] Setting up Python virtual environment...${NC}"
sudo -u "$AIRFLOW_USER" python3 -m venv "$AIRFLOW_HOME/venv"
sudo -u "$AIRFLOW_USER" "$AIRFLOW_HOME/venv/bin/pip" install --upgrade pip
sudo -u "$AIRFLOW_USER" "$AIRFLOW_HOME/venv/bin/pip" install "apache-airflow[postgres,crypto]==2.7.0" psycopg2-binary
# Configure Airflow
echo -e "${YELLOW}[6/10] Configuring Airflow...${NC}"
mkdir -p "$AIRFLOW_HOME/config"
chown -R "$AIRFLOW_USER:$AIRFLOW_USER" "$AIRFLOW_HOME"
cat > "$AIRFLOW_HOME/airflow.cfg" << 'EOF'
[core]
dags_folder = /opt/airflow/dags
base_log_folder = /opt/airflow/logs
logging_level = INFO
executor = LocalExecutor
sql_alchemy_conn = sqlite:///opt/airflow/airflow.db
load_examples = False
store_dag_code = True
store_serialized_dags = True
[webserver]
rbac = True
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
web_server_port = 8080
filter_by_owner = True
access_control_allow_origins = *
[api]
auth_backend = airflow.api.auth.backend.basic_auth
[logging]
logging_level = INFO
fab_logging_level = WARN
remote_logging = False
EOF
chown "$AIRFLOW_USER:$AIRFLOW_USER" "$AIRFLOW_HOME/airflow.cfg"
chmod 644 "$AIRFLOW_HOME/airflow.cfg"
# Create directories
echo -e "${YELLOW}[7/10] Creating Airflow directories...${NC}"
sudo -u "$AIRFLOW_USER" mkdir -p "$AIRFLOW_HOME"/{dags,logs,plugins,scripts}
# Initialize database and create admin user
echo -e "${YELLOW}[8/10] Initializing Airflow database...${NC}"
export AIRFLOW_HOME="$AIRFLOW_HOME"
sudo -u "$AIRFLOW_USER" bash -c "
cd $AIRFLOW_HOME
source venv/bin/activate
export AIRFLOW_HOME=$AIRFLOW_HOME
airflow db init
airflow users create --username admin --firstname Admin --lastname User --role Admin --email $ADMIN_EMAIL --password $ADMIN_PASSWORD
"
# Create custom roles script
cat > "$AIRFLOW_HOME/scripts/setup_roles.py" << 'EOF'
#!/usr/bin/env python3
import os
os.environ['AIRFLOW_HOME'] = '/opt/airflow'
from airflow import settings
from airflow.www.app import cached_app
def create_custom_roles():
app = cached_app()
with app.app_context():
security_manager = app.appbuilder.sm
# Create Data Engineer role
data_engineer_role = security_manager.find_role('DataEngineer')
if not data_engineer_role:
data_engineer_role = security_manager.add_role('DataEngineer')
# Create Data Analyst role
data_analyst_role = security_manager.find_role('DataAnalyst')
if not data_analyst_role:
data_analyst_role = security_manager.add_role('DataAnalyst')
# Add permissions for Data Engineer
de_perms = ['can_read', 'can_edit', 'can_create']
da_perms = ['can_read']
for view_name in ['DAGs', 'Task Instances', 'Log']:
for perm_name in de_perms:
perm = security_manager.find_permission_view_menu(perm_name, view_name)
if perm and data_engineer_role:
security_manager.add_permission_role(data_engineer_role, perm)
for perm_name in da_perms:
perm = security_manager.find_permission_view_menu(perm_name, view_name)
if perm and data_analyst_role:
security_manager.add_permission_role(data_analyst_role, perm)
if __name__ == '__main__':
create_custom_roles()
print("Custom roles created successfully")
EOF
chown "$AIRFLOW_USER:$AIRFLOW_USER" "$AIRFLOW_HOME/scripts/setup_roles.py"
chmod 755 "$AIRFLOW_HOME/scripts/setup_roles.py"
# Run custom roles setup
sudo -u "$AIRFLOW_USER" bash -c "
cd $AIRFLOW_HOME
source venv/bin/activate
export AIRFLOW_HOME=$AIRFLOW_HOME
python3 scripts/setup_roles.py
"
# Create systemd services
echo -e "${YELLOW}[9/10] Creating systemd services...${NC}"
cat > /etc/systemd/system/airflow-webserver.service << EOF
[Unit]
Description=Airflow webserver daemon
After=network.target
[Service]
Type=simple
User=$AIRFLOW_USER
Group=$AIRFLOW_USER
WorkingDirectory=$AIRFLOW_HOME
Environment=AIRFLOW_HOME=$AIRFLOW_HOME
ExecStart=$AIRFLOW_HOME/venv/bin/airflow webserver
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
cat > /etc/systemd/system/airflow-scheduler.service << EOF
[Unit]
Description=Airflow scheduler daemon
After=network.target
[Service]
Type=simple
User=$AIRFLOW_USER
Group=$AIRFLOW_USER
WorkingDirectory=$AIRFLOW_HOME
Environment=AIRFLOW_HOME=$AIRFLOW_HOME
ExecStart=$AIRFLOW_HOME/venv/bin/airflow scheduler
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
# Configure firewall
if command -v firewall-cmd &> /dev/null; then
firewall-cmd --permanent --add-port=8080/tcp
firewall-cmd --reload
elif command -v ufw &> /dev/null; then
ufw allow 8080/tcp
fi
# Start services
systemctl daemon-reload
systemctl enable airflow-webserver airflow-scheduler
systemctl start airflow-webserver airflow-scheduler
# Verification
echo -e "${YELLOW}[10/10] Verifying installation...${NC}"
sleep 10
if systemctl is-active --quiet airflow-webserver && systemctl is-active --quiet airflow-scheduler; then
echo -e "${GREEN}✓ Airflow services are running${NC}"
else
echo -e "${RED}✗ Airflow services failed to start${NC}"
exit 1
fi
if curl -s http://localhost:8080/health > /dev/null; then
echo -e "${GREEN}✓ Airflow web interface is accessible${NC}"
else
echo -e "${YELLOW}⚠ Airflow web interface may still be starting${NC}"
fi
echo -e "${GREEN}Apache Airflow RBAC security setup complete!${NC}"
echo -e "${GREEN}Access the web interface at: http://$(hostname -I | awk '{print $1}'):8080${NC}"
echo -e "${GREEN}Admin credentials - Username: admin, Password: $ADMIN_PASSWORD${NC}"
echo -e "${GREEN}Custom roles created: DataEngineer, DataAnalyst${NC}"
Review the script before running. Execute with: bash install.sh