Set up Apache Airflow DAG security policies and data governance with RBAC and audit logging

Advanced 45 min Apr 21, 2026 15 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Configure comprehensive security for Apache Airflow with role-based access control, DAG-level permissions, data lineage tracking, and audit logging for compliance monitoring in production environments.

Prerequisites

  • Apache Airflow 2.0+ installed
  • PostgreSQL database configured
  • Python 3.8+ with pip
  • Root or sudo access

What this solves

Apache Airflow's default configuration provides minimal security controls, leaving DAGs and data accessible to all users. This tutorial implements enterprise-grade security with role-based access control (RBAC), DAG-level permissions, data governance through lineage tracking, and comprehensive audit logging. You'll secure workflow access, track data transformations, and maintain compliance audit trails for production environments.

Step-by-step configuration

Enable RBAC authentication

Configure Airflow to use RBAC with database-backed authentication instead of the default authentication.

[webserver]
rbac = True
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

[api]
auth_backend = airflow.api.auth.backend.basic_auth

Initialize RBAC database tables

Create the necessary database tables for RBAC user management and permissions.

cd /opt/airflow
source venv/bin/activate
airflow db init
airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com

Configure custom roles and permissions

Create a Python script to define custom roles with specific DAG and task permissions.

from airflow import settings
from airflow.models import DagBag
from airflow.www.app import cached_app
from flask_appbuilder.security.sqla.models import Role, Permission, ViewMenu
from flask_appbuilder.security.sqla.manager import SecurityManager

def create_custom_roles():
    app = cached_app()
    with app.app_context():
        security_manager = app.appbuilder.sm
        
        # Create Data Engineer role
        data_engineer_role = security_manager.find_role('DataEngineer')
        if not data_engineer_role:
            data_engineer_role = security_manager.add_role('DataEngineer')
            
        # Create Data Analyst role
        data_analyst_role = security_manager.find_role('DataAnalyst')
        if not data_analyst_role:
            data_analyst_role = security_manager.add_role('DataAnalyst')
            
        # Add permissions to Data Engineer
        perms_de = [
            'can_read on DAGs',
            'can_edit on DAGs', 
            'can_read on Task Instances',
            'can_create on Task Instances',
            'can_read on Log',
            'menu_access on Browse'
        ]
        
        # Add permissions to Data Analyst
        perms_da = [
            'can_read on DAGs',
            'can_read on Task Instances',
            'can_read on Log',
            'menu_access on Browse'
        ]
        
        for perm_str in perms_de:
            perm = security_manager.find_permission_view_menu(*perm_str.split(' on '))
            if perm:
                security_manager.add_permission_role(data_engineer_role, perm)
                
        for perm_str in perms_da:
            perm = security_manager.find_permission_view_menu(*perm_str.split(' on '))
            if perm:
                security_manager.add_permission_role(data_analyst_role, perm)

if __name__ == '__main__':
    create_custom_roles()

Implement DAG-level access control

Configure DAG-level permissions to restrict access based on user roles and DAG tags.

[webserver]
filter_by_owner = True
access_control_allow_origins = *

[core]
store_dag_code = True
store_serialized_dags = True

Create DAG access control decorator

Implement a decorator to control DAG access based on user permissions and tags.

from functools import wraps
from airflow.models import Variable
from airflow.exceptions import AirflowException
from airflow.utils.db import provide_session
from airflow.models.dagbag import DagBag
from flask_login import current_user

def dag_access_control(allowed_roles=None, required_tags=None):
    """
    Decorator to control DAG access based on user roles and tags
    """
    def decorator(dag_function):
        @wraps(dag_function)
        def wrapper(*args, **kwargs):
            if allowed_roles and hasattr(current_user, 'roles'):
                user_roles = [role.name for role in current_user.roles]
                if not any(role in allowed_roles for role in user_roles):
                    raise AirflowException(f"Access denied. Required roles: {allowed_roles}")
            
            dag = dag_function(*args, **kwargs)
            
            # Add access control tags
            if required_tags:
                existing_tags = set(dag.tags or [])
                dag.tags = list(existing_tags.union(set(required_tags)))
            
            return dag
        return wrapper
    return decorator

class DagPermissionChecker:
    @staticmethod
    @provide_session
    def can_access_dag(dag_id, user, session=None):
        """Check if user can access specific DAG"""
        dag_bag = DagBag()
        dag = dag_bag.get_dag(dag_id)
        
        if not dag:
            return False
            
        # Check owner-based access
        if hasattr(dag, 'owner') and dag.owner == user.username:
            return True
            
        # Check role-based access
        if hasattr(user, 'roles'):
            user_roles = [role.name for role in user.roles]
            if 'Admin' in user_roles:
                return True
                
            # Check tag-based permissions
            if dag.tags:
                allowed_tags = Variable.get(f"dag_access_{user.username}", default_var="", deserialize_json=True)
                if any(tag in allowed_tags for tag in dag.tags):
                    return True
        
        return False

Enable audit logging

Configure comprehensive audit logging to track all user actions and system events.

[logging]
logging_level = INFO
fab_logging_level = INFO
log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
logging_config_class = airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG

[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

Create audit logging plugin

Implement a custom plugin to capture detailed audit events for compliance tracking.

import json
import logging
from datetime import datetime
from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.hooks.base_hook import BaseHook
from flask import request, g
from flask_login import current_user

class AuditLogger:
    def __init__(self):
        self.logger = logging.getLogger('airflow.audit')
        handler = logging.FileHandler('/opt/airflow/logs/audit.log')
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def log_event(self, event_type, details):
        audit_event = {
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': event_type,
            'user': getattr(current_user, 'username', 'anonymous'),
            'ip_address': request.remote_addr if request else 'system',
            'details': details
        }
        self.logger.info(json.dumps(audit_event))

class AuditHook(BaseHook):
    def __init__(self):
        super().__init__()
        self.audit_logger = AuditLogger()
    
    def log_dag_run(self, dag_id, run_id, state):
        self.audit_logger.log_event('dag_run', {
            'dag_id': dag_id,
            'run_id': run_id,
            'state': state
        })
    
    def log_task_instance(self, dag_id, task_id, execution_date, state):
        self.audit_logger.log_event('task_instance', {
            'dag_id': dag_id,
            'task_id': task_id,
            'execution_date': execution_date.isoformat(),
            'state': state
        })
    
    def log_connection_access(self, conn_id):
        self.audit_logger.log_event('connection_access', {
            'connection_id': conn_id
        })
    
    def log_variable_access(self, key, action):
        self.audit_logger.log_event('variable_access', {
            'variable_key': key,
            'action': action
        })

class AuditOperator(BaseOperator):
    @apply_defaults
    def __init__(self, event_type, details=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.event_type = event_type
        self.details = details or {}
        self.audit_hook = AuditHook()
    
    def execute(self, context):
        self.audit_hook.audit_logger.log_event(self.event_type, {
            **self.details,
            'dag_id': context['dag'].dag_id,
            'task_id': context['task'].task_id,
            'execution_date': context['execution_date'].isoformat()
        })

class AuditPlugin(AirflowPlugin):
    name = 'audit_plugin'
    operators = [AuditOperator]
    hooks = [AuditHook]

Configure data lineage tracking

Set up data lineage tracking to monitor data flow and transformations across DAGs.

import json
from typing import List, Dict, Any
from airflow.models import BaseOperator, XCom
from airflow.utils.decorators import apply_defaults
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime

class DataLineageTracker:
    def __init__(self):
        self.postgres_hook = PostgresHook(postgres_conn_id='lineage_db')
        self._ensure_lineage_tables()
    
    def _ensure_lineage_tables(self):
        create_tables_sql = """
        CREATE TABLE IF NOT EXISTS data_lineage (
            id SERIAL PRIMARY KEY,
            dag_id VARCHAR(250) NOT NULL,
            task_id VARCHAR(250) NOT NULL,
            execution_date TIMESTAMP NOT NULL,
            source_datasets TEXT[],
            target_datasets TEXT[],
            transformation_type VARCHAR(100),
            metadata JSONB,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        
        CREATE TABLE IF NOT EXISTS dataset_registry (
            id SERIAL PRIMARY KEY,
            dataset_name VARCHAR(500) UNIQUE NOT NULL,
            dataset_type VARCHAR(100),
            schema_info JSONB,
            owner VARCHAR(250),
            description TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        
        CREATE INDEX IF NOT EXISTS idx_lineage_dag_task 
        ON data_lineage(dag_id, task_id);
        
        CREATE INDEX IF NOT EXISTS idx_lineage_execution_date 
        ON data_lineage(execution_date);
        """
        self.postgres_hook.run(create_tables_sql)
    
    def track_lineage(self, dag_id: str, task_id: str, execution_date: datetime,
                     source_datasets: List[str], target_datasets: List[str],
                     transformation_type: str, metadata: Dict[str, Any] = None):
        insert_sql = """
        INSERT INTO data_lineage 
        (dag_id, task_id, execution_date, source_datasets, target_datasets, 
         transformation_type, metadata)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
        """
        
        self.postgres_hook.run(insert_sql, parameters=(
            dag_id, task_id, execution_date,
            source_datasets, target_datasets,
            transformation_type, json.dumps(metadata or {})
        ))
    
    def register_dataset(self, dataset_name: str, dataset_type: str,
                        schema_info: Dict[str, Any], owner: str, description: str = None):
        insert_sql = """
        INSERT INTO dataset_registry 
        (dataset_name, dataset_type, schema_info, owner, description)
        VALUES (%s, %s, %s, %s, %s)
        ON CONFLICT (dataset_name) 
        DO UPDATE SET 
            schema_info = EXCLUDED.schema_info,
            updated_at = CURRENT_TIMESTAMP
        """
        
        self.postgres_hook.run(insert_sql, parameters=(
            dataset_name, dataset_type, json.dumps(schema_info), owner, description
        ))
    
    def get_lineage_graph(self, dataset_name: str, depth: int = 3):
        """Get upstream and downstream lineage for a dataset"""
        query_sql = """
        WITH RECURSIVE lineage_tree AS (
            -- Base case: direct lineage
            SELECT dag_id, task_id, source_datasets, target_datasets, 0 as level
            FROM data_lineage 
            WHERE %s = ANY(source_datasets) OR %s = ANY(target_datasets)
            
            UNION ALL
            
            -- Recursive case: expand lineage
            SELECT dl.dag_id, dl.task_id, dl.source_datasets, dl.target_datasets, lt.level + 1
            FROM data_lineage dl
            JOIN lineage_tree lt ON (
                array_overlap(dl.target_datasets, lt.source_datasets) OR
                array_overlap(dl.source_datasets, lt.target_datasets)
            )
            WHERE lt.level < %s
        )
        SELECT DISTINCT * FROM lineage_tree ORDER BY level;
        """
        
        result = self.postgres_hook.get_records(query_sql, parameters=(
            dataset_name, dataset_name, depth
        ))
        return result

class LineageOperator(BaseOperator):
    @apply_defaults
    def __init__(self, source_datasets: List[str], target_datasets: List[str],
                 transformation_type: str, metadata: Dict[str, Any] = None,
                 *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.source_datasets = source_datasets
        self.target_datasets = target_datasets
        self.transformation_type = transformation_type
        self.metadata = metadata or {}
        self.lineage_tracker = DataLineageTracker()
    
    def execute(self, context):
        self.lineage_tracker.track_lineage(
            context['dag'].dag_id,
            context['task'].task_id,
            context['execution_date'],
            self.source_datasets,
            self.target_datasets,
            self.transformation_type,
            self.metadata
        )

Create secure DAG template

Implement a template DAG that demonstrates security policies, audit logging, and lineage tracking.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from utils.security import dag_access_control
from utils.lineage import LineageOperator, DataLineageTracker
from plugins.audit_plugin import AuditOperator, AuditHook

Security configuration

default_args = { 'owner': 'data-engineering-team', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) }

Apply access control decorator

@dag_access_control( allowed_roles=['Admin', 'DataEngineer'], required_tags=['etl', 'production', 'sensitive'] ) def create_secure_etl_dag(): dag = DAG( 'secure_etl_example', default_args=default_args, description='Secure ETL DAG with audit logging and lineage tracking', schedule_interval='@daily', catchup=False, tags=['etl', 'production', 'sensitive'], max_active_runs=1, doc_md=""" # Secure ETL Pipeline This DAG demonstrates: - Role-based access control - Data lineage tracking - Audit logging - Secure data processing Access: DataEngineer role required Data Classification: Sensitive """ ) # Audit task start audit_start = AuditOperator( task_id='audit_pipeline_start', event_type='pipeline_start', details={'classification': 'sensitive'}, dag=dag ) # Data extraction with lineage tracking extract_data = LineageOperator( task_id='extract_customer_data', source_datasets=['production.customers', 'production.orders'], target_datasets=['staging.customer_extract'], transformation_type='extract', metadata={ 'extraction_method': 'incremental', 'filter_conditions': 'created_date > CURRENT_DATE - 1', 'row_count_estimate': 10000 }, dag=dag ) def secure_transform(**context): """Secure data transformation with audit logging""" audit_hook = AuditHook() lineage_tracker = DataLineageTracker() try: # Log data access audit_hook.log_connection_access('production_db') # Simulate secure data transformation # In real implementation, add encryption, masking, etc. transformation_metadata = { 'records_processed': 9500, 'pii_fields_masked': ['email', 'phone'], 'encryption_applied': True } # Track transformation lineage lineage_tracker.track_lineage( context['dag'].dag_id, context['task'].task_id, context['execution_date'], ['staging.customer_extract'], ['warehouse.customer_clean'], 'transform', transformation_metadata ) # Log successful completion audit_hook.audit_logger.log_event('data_transformation', { 'status': 'success', **transformation_metadata }) except Exception as e: audit_hook.audit_logger.log_event('data_transformation', { 'status': 'failed', 'error': str(e) }) raise transform_data = PythonOperator( task_id='transform_customer_data', python_callable=secure_transform, dag=dag ) # Data loading with lineage load_data = LineageOperator( task_id='load_customer_data', source_datasets=['warehouse.customer_clean'], target_datasets=['analytics.customer_mart'], transformation_type='load', metadata={ 'load_method': 'upsert', 'target_table': 'customer_mart', 'business_keys': ['customer_id'] }, dag=dag ) # Data quality check def data_quality_check(**context): """Perform data quality checks with audit logging""" audit_hook = AuditHook() # Simulate quality checks quality_results = { 'null_check_passed': True, 'duplicate_check_passed': True, 'range_check_passed': True, 'total_records': 9500, 'quality_score': 0.98 } audit_hook.audit_logger.log_event('data_quality_check', quality_results) if quality_results['quality_score'] < 0.95: raise Exception(f"Data quality score too low: {quality_results['quality_score']}") quality_check = PythonOperator( task_id='data_quality_check', python_callable=data_quality_check, dag=dag ) # Audit task completion audit_complete = AuditOperator( task_id='audit_pipeline_complete', event_type='pipeline_complete', details={'status': 'success'}, dag=dag ) # Define task dependencies audit_start >> extract_data >> transform_data >> load_data >> quality_check >> audit_complete return dag

Create the DAG

secure_etl_dag = create_secure_etl_dag()

Set up audit log monitoring

Configure log rotation and monitoring for audit logs to maintain compliance and performance.

/opt/airflow/logs/audit.log {
    daily
    rotate 365
    compress
    delaycompress
    missingok
    notifempty
    create 644 airflow airflow
    postrotate
        /bin/kill -HUP $(cat /var/run/airflow/webserver.pid 2>/dev/null) 2>/dev/null || true
    endscript
}

Create compliance monitoring dashboard

Set up monitoring queries to track security compliance and audit events.

import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable

class ComplianceMonitor:
    def __init__(self):
        self.postgres_hook = PostgresHook(postgres_conn_id='lineage_db')
    
    def get_access_violations(self, days: int = 7) -> List[Dict]:
        """Check for unauthorized access attempts"""
        # This would parse audit logs for failed access attempts
        # Implementation depends on your audit log format
        pass
    
    def get_data_processing_report(self, days: int = 30) -> Dict[str, Any]:
        """Generate data processing compliance report"""
        query = """
        SELECT 
            COUNT(DISTINCT dag_id) as active_dags,
            COUNT(*) as total_lineage_events,
            COUNT(DISTINCT unnest(source_datasets)) as source_datasets,
            COUNT(DISTINCT unnest(target_datasets)) as target_datasets,
            transformation_type,
            COUNT(*) as transformation_count
        FROM data_lineage 
        WHERE execution_date >= %s
        GROUP BY transformation_type
        ORDER BY transformation_count DESC
        """
        
        since_date = datetime.now() - timedelta(days=days)
        results = self.postgres_hook.get_records(query, parameters=(since_date,))
        
        return {
            'period_days': days,
            'report_generated': datetime.now().isoformat(),
            'transformations': results
        }
    
    def check_sensitive_data_access(self) -> Dict[str, Any]:
        """Monitor access to sensitive datasets"""
        sensitive_patterns = Variable.get('sensitive_dataset_patterns', 
                                        default_var=['pii', 'financial', 'medical'],
                                        deserialize_json=True)
        
        query = """
        SELECT dag_id, task_id, source_datasets, target_datasets, execution_date
        FROM data_lineage 
        WHERE execution_date >= %s
        AND (array_to_string(source_datasets, ',') ~ %s 
             OR array_to_string(target_datasets, ',') ~ %s)
        ORDER BY execution_date DESC
        LIMIT 100
        """
        
        pattern = '|'.join(sensitive_patterns)
        since_date = datetime.now() - timedelta(days=1)
        
        results = self.postgres_hook.get_records(query, parameters=(
            since_date, pattern, pattern
        ))
        
        return {
            'sensitive_data_access_events': len(results),
            'events': results[:10],  # Return first 10 for dashboard
            'patterns_monitored': sensitive_patterns
        }
    
    def generate_compliance_summary(self) -> Dict[str, Any]:
        """Generate overall compliance status"""
        return {
            'timestamp': datetime.now().isoformat(),
            'data_processing': self.get_data_processing_report(30),
            'sensitive_access': self.check_sensitive_data_access(),
            'compliance_score': self.calculate_compliance_score()
        }
    
    def calculate_compliance_score(self) -> float:
        """Calculate overall compliance score"""
        # Implement scoring logic based on:
        # - Audit log completeness
        # - Access control violations
        # - Data lineage coverage
        # - Policy adherence
        return 0.95  # Placeholder

Restart Airflow services

Restart all Airflow components to apply the security configuration changes.

sudo systemctl stop airflow-webserver
sudo systemctl stop airflow-scheduler
sudo systemctl stop airflow-worker

Clear any cached configurations

rm -rf /opt/airflow/airflow-webserver.pid rm -rf /tmp/airflowdb_*

Restart services

sudo systemctl start airflow-webserver sudo systemctl start airflow-scheduler sudo systemctl start airflow-worker

Verify services are running

sudo systemctl status airflow-webserver sudo systemctl status airflow-scheduler

Verify your setup

Test the security configuration and verify all components are working correctly.

# Check RBAC is enabled
curl -u admin:your_password http://localhost:8080/api/v1/config

Verify audit logging

tail -f /opt/airflow/logs/audit.log

Test DAG access control

cd /opt/airflow source venv/bin/activate python -c "from dags.utils.security import DagPermissionChecker; print('Security module loaded successfully')"

Check lineage database tables

psql -h localhost -U airflow -d lineage_db -c "\dt"

Test lineage tracking

python -c "from dags.utils.lineage import DataLineageTracker; tracker = DataLineageTracker(); print('Lineage tracker initialized')"

Verify plugin loading

airflow plugins | grep audit_plugin
Note: If you see permission errors, ensure the airflow user has read/write access to log directories with sudo chown -R airflow:airflow /opt/airflow/logs

Configure user management

Set up additional users and assign appropriate roles for your team structure.

# Create users with specific roles
airflow users create \
    --username data_engineer_1 \
    --firstname John \
    --lastname Doe \
    --role DataEngineer \
    --email john.doe@example.com

airflow users create \
    --username data_analyst_1 \
    --firstname Jane \
    --lastname Smith \
    --role DataAnalyst \
    --email jane.smith@example.com

List all users and their roles

airflow users list

Common issues

SymptomCauseFix
RBAC interface not loadingDatabase not initializedairflow db upgrade and restart webserver
Audit logs not createdPermissions or plugin not loadedCheck /opt/airflow/logs permissions and verify plugin in airflow plugins
Lineage tracking failsDatabase connection missingCreate connection: airflow connections add lineage_db --conn-type postgres --conn-host localhost
DAG access denied errorsUser not assigned to roleCheck user roles with airflow users list and assign with web UI
Custom permissions not workingSecurity manager cacheRestart webserver and clear /tmp/airflow* cache files

Next steps

Running this in production?

Want this handled for you? Running this at scale adds a second layer of work: capacity planning, failover drills, cost control, and on-call. Our managed platform covers monitoring, backups and 24/7 response by default.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle infrastructure security hardening for businesses that depend on uptime. From initial setup to ongoing operations.