Implement Airflow DAG security scanning with Bandit and safety checks

Intermediate 45 min Apr 23, 2026 88 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up automated security scanning for Apache Airflow DAGs using Bandit for Python code analysis and Safety for vulnerability detection. Configure pre-commit hooks and CI/CD integration for continuous security monitoring.

Prerequisites

  • Apache Airflow installation
  • Python 3.8 or later
  • Git repository for DAGs
  • Basic Linux system administration

What this solves

Apache Airflow DAGs often contain sensitive operations like database connections, API calls, and data processing logic that can introduce security vulnerabilities. This tutorial implements automated security scanning using Bandit for Python security issues and Safety for known vulnerability detection, ensuring your DAGs meet security standards before deployment.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you get the latest versions of Python and security tools.

sudo apt update && sudo apt upgrade -y
sudo apt install -y python3-pip python3-venv git
sudo dnf update -y
sudo dnf install -y python3-pip python3-venv git

Install Bandit and Safety security scanners

Install Bandit for Python security scanning and Safety for vulnerability detection. Create a virtual environment to isolate the security tools.

python3 -m venv /opt/airflow-security
source /opt/airflow-security/bin/activate
pip install bandit[toml] safety pre-commit gitpython

Create Bandit configuration file

Configure Bandit with custom rules for Airflow DAGs. This configuration excludes test files and focuses on DAG-specific security issues.

[bandit]
exclude_dirs = ["/opt/airflow/logs", "/opt/airflow/plugins/__pycache__"]
skips = ["B101", "B601"]

[bandit.assert_used]
skips = ["_test.py", "/test_*.py"]

[bandit.hardcoded_password_string]
word_list = ["password", "pass", "passwd", "pwd", "secret", "token"]

[bandit.hardcoded_password_funcarg]
word_list = ["password", "pass", "passwd", "pwd", "secret", "token"]

[bandit.hardcoded_password_default]
word_list = ["password", "pass", "passwd", "pwd", "secret", "token"]

[bandit.sql_injection]
sql_statements = ["select", "insert", "update", "delete", "create", "drop"]
qualname_patterns = [".execute", ".executemany", "*.cursor"]

[bandit.shell_injection]
shell_wraps = ["os.system", "os.popen", "subprocess.call", "subprocess.run"]
subprocess_without_shell_equals_true = [
    "subprocess.Popen",
    "subprocess.call",
    "subprocess.check_call",
    "subprocess.check_output",
    "subprocess.run"
]

Create Safety configuration

Configure Safety to check for known vulnerabilities in Python dependencies. The policy file defines security thresholds and reporting preferences.

{
  "security": {
    "ignore-cvss-severity-below": 7.0,
    "ignore-cvss-unknown-severity": false,
    "continue-on-vulnerability-error": false
  },
  "alert": {
    "ignore-unpinned-requirements": false
  },
  "report": {
    "only-report": false,
    "output": {
      "format": "json",
      "file": "/opt/airflow/logs/safety-report.json"
    }
  },
  "ignore": {
    "vulnerabilities": [],
    "packages": []
  }
}

Create DAG security scanning script

Create an automated script that scans all DAG files for security issues using both Bandit and Safety.

#!/usr/bin/env python3
"""Airflow DAG Security Scanner"""

import os
import sys
import json
import subprocess
import argparse
from pathlib import Path
from datetime import datetime

class DAGSecurityScanner:
    def __init__(self, dag_dir, config_dir):
        self.dag_dir = Path(dag_dir)
        self.config_dir = Path(config_dir)
        self.bandit_config = self.config_dir / "bandit.yaml"
        self.safety_policy = self.config_dir / ".safety-policy.json"
        self.results = {"timestamp": datetime.now().isoformat(), "scans": {}}
    
    def run_bandit_scan(self):
        """Run Bandit security scan on DAG files"""
        print("Running Bandit security scan...")
        cmd = [
            "bandit",
            "-r", str(self.dag_dir),
            "-f", "json",
            "-c", str(self.bandit_config),
            "-ll",  # Only report medium and high severity
            "-i"    # Show confidence levels
        ]
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, check=False)
            if result.returncode == 0:
                self.results["scans"]["bandit"] = {
                    "status": "clean",
                    "issues_found": 0,
                    "output": json.loads(result.stdout) if result.stdout else {}
                }
                print("✓ Bandit scan completed - no issues found")
            else:
                issues_data = json.loads(result.stdout) if result.stdout else {}
                issue_count = len(issues_data.get("results", []))
                self.results["scans"]["bandit"] = {
                    "status": "issues_found",
                    "issues_found": issue_count,
                    "output": issues_data
                }
                print(f"⚠ Bandit scan found {issue_count} security issues")
                return False
        except Exception as e:
            print(f"✗ Bandit scan failed: {e}")
            self.results["scans"]["bandit"] = {"status": "error", "error": str(e)}
            return False
        
        return True
    
    def run_safety_scan(self):
        """Run Safety vulnerability scan"""
        print("Running Safety vulnerability scan...")
        cmd = [
            "safety", "check",
            "--json",
            "--policy-file", str(self.safety_policy)
        ]
        
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, check=False)
            if result.returncode == 0:
                self.results["scans"]["safety"] = {
                    "status": "clean",
                    "vulnerabilities_found": 0,
                    "output": json.loads(result.stdout) if result.stdout else []
                }
                print("✓ Safety scan completed - no vulnerabilities found")
            else:
                vulns_data = json.loads(result.stdout) if result.stdout else []
                vuln_count = len(vulns_data)
                self.results["scans"]["safety"] = {
                    "status": "vulnerabilities_found",
                    "vulnerabilities_found": vuln_count,
                    "output": vulns_data
                }
                print(f"⚠ Safety scan found {vuln_count} vulnerabilities")
                return False
        except Exception as e:
            print(f"✗ Safety scan failed: {e}")
            self.results["scans"]["safety"] = {"status": "error", "error": str(e)}
            return False
        
        return True
    
    def generate_report(self, output_file=None):
        """Generate security scan report"""
        if output_file:
            with open(output_file, 'w') as f:
                json.dump(self.results, f, indent=2)
            print(f"Report saved to {output_file}")
        else:
            print(json.dumps(self.results, indent=2))
    
    def scan_all(self, report_file=None):
        """Run all security scans"""
        print(f"Starting security scan of DAGs in {self.dag_dir}")
        bandit_ok = self.run_bandit_scan()
        safety_ok = self.run_safety_scan()
        
        self.generate_report(report_file)
        
        if bandit_ok and safety_ok:
            print("\n✓ All security scans passed")
            return True
        else:
            print("\n✗ Security issues found - check report for details")
            return False

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Scan Airflow DAGs for security issues")
    parser.add_argument("--dag-dir", default="/opt/airflow/dags", help="DAG directory path")
    parser.add_argument("--config-dir", default="/opt/airflow", help="Configuration directory")
    parser.add_argument("--report", help="Output report file path")
    
    args = parser.parse_args()
    
    scanner = DAGSecurityScanner(args.dag_dir, args.config_dir)
    success = scanner.scan_all(args.report)
    sys.exit(0 if success else 1)

Make the scanner executable and create log directory

Set proper permissions and create required directories for security scanning logs.

chmod +x /opt/airflow/scripts/scan_dag_security.py
mkdir -p /opt/airflow/logs/security
chown -R airflow:airflow /opt/airflow/scripts /opt/airflow/logs/security
Never use chmod 777. The script only needs execute permission (755) and logs directory needs write access for the airflow user (775).

Configure pre-commit hooks

Set up pre-commit hooks to automatically run security scans before commits. This prevents insecure code from entering your repository.

repos:
  - repo: local
    hooks:
      - id: bandit-security-scan
        name: Bandit Security Scan
        entry: /opt/airflow-security/bin/bandit
        language: system
        args: [-r, dags/, -f, json, -c, bandit.yaml, -ll]
        files: \.py$
        exclude: __pycache__|test_.*\.py$
        
      - id: safety-vulnerability-scan
        name: Safety Vulnerability Scan
        entry: /opt/airflow-security/bin/safety
        language: system
        args: [check, --json, --policy-file, .safety-policy.json]
        pass_filenames: false
        
      - id: dag-security-validation
        name: DAG Security Validation
        entry: python3
        language: system
        args: [/opt/airflow/scripts/scan_dag_security.py, --dag-dir, dags/]
        files: dags/.*\.py$
        pass_filenames: false
        
  - repo: https://github.com/psf/black
    rev: 23.12.1
    hooks:
      - id: black
        files: dags/.*\.py$
        
  - repo: https://github.com/pycqa/flake8
    rev: 6.1.0
    hooks:
      - id: flake8
        files: dags/.*\.py$
        args: [--max-line-length=88, --extend-ignore=E203,W503]

Install pre-commit hooks

Initialize pre-commit in your Airflow repository to enable automatic security scanning.

cd /opt/airflow
source /opt/airflow-security/bin/activate
pre-commit install
pre-commit run --all-files

Create custom security rules

Add custom Bandit plugins for Airflow-specific security checks. This file creates rules for common DAG security issues.

"""Custom Bandit plugins for Airflow DAG security"""

import ast
from bandit.core import test_properties
from bandit.core import utils

@test_properties.test_id('B901')
@test_properties.checks('Call')
def check_airflow_variable_plaintext(context):
    """Check for plaintext Airflow Variables instead of encrypted ones"""
    if context.call_function_name_qual == 'Variable.get':
        args = context.node.args
        keywords = context.node.keywords
        
        # Check if deserialize_json=True is used (potential security issue)
        for kw in keywords:
            if (kw.arg == 'deserialize_json' and 
                isinstance(kw.value, ast.Constant) and kw.value.value is True):
                return bandit.Issue(
                    severity=bandit.MEDIUM,
                    confidence=bandit.HIGH,
                    text="Airflow Variable with deserialize_json=True may expose secrets",
                    lineno=context.node.lineno
                )
    return None

@test_properties.test_id('B902')
@test_properties.checks('Call')
def check_airflow_connection_password(context):
    """Check for hardcoded passwords in Airflow Connections"""
    if context.call_function_name_qual in ['Connection', 'BaseHook.get_connection']:
        keywords = context.node.keywords
        
        for kw in keywords:
            if kw.arg == 'password' and isinstance(kw.value, ast.Constant):
                if isinstance(kw.value.value, str) and len(kw.value.value) > 0:
                    return bandit.Issue(
                        severity=bandit.HIGH,
                        confidence=bandit.HIGH,
                        text="Hardcoded password in Airflow Connection",
                        lineno=context.node.lineno
                    )
    return None

@test_properties.test_id('B903')
@test_properties.checks('Call')
def check_bash_operator_injection(context):
    """Check for potential command injection in BashOperator"""
    if 'BashOperator' in context.call_function_name_qual:
        keywords = context.node.keywords
        
        for kw in keywords:
            if kw.arg == 'bash_command':
                if isinstance(kw.value, ast.JoinedStr):  # f-string
                    return bandit.Issue(
                        severity=bandit.MEDIUM,
                        confidence=bandit.MEDIUM,
                        text="BashOperator with f-string may be vulnerable to injection",
                        lineno=context.node.lineno
                    )
                elif (isinstance(kw.value, ast.BinOp) and 
                      isinstance(kw.value.op, ast.Mod)):  # % formatting
                    return bandit.Issue(
                        severity=bandit.MEDIUM,
                        confidence=bandit.MEDIUM,
                        text="BashOperator with % formatting may be vulnerable to injection",
                        lineno=context.node.lineno
                    )
    return None

Configure custom security exceptions

Create a configuration file for managing security exceptions and false positives specific to your DAG patterns.

# Security scanning exceptions for Airflow DAGs
exceptions:
  bandit:
    # Skip assert_used in test DAGs
    B101:
      files:
        - "**/test_*.py"
        - "**/*_test.py"
      reason: "Assert statements acceptable in test files"
    
    # Skip hardcoded_password_string for specific patterns
    B105:
      patterns:
        - "default_password = None"
        - "password = ''"
      reason: "Default/empty password values are acceptable"
    
    # Skip subprocess_popen_with_shell_equals_true for specific cases
    B602:
      files:
        - "dags/legacy/*.py"
      reason: "Legacy DAGs with approved shell usage"
  
  safety:
    # Ignore specific CVEs that don't affect our usage
    ignore_vulnerabilities:
      - "51668"  # Example: ignore specific CVE if not applicable
    
    # Ignore packages we can't upgrade due to Airflow constraints
    ignore_packages:
      - "apache-airflow"  # Managed separately

Severity thresholds

severity_levels: bandit: fail_on: - "HIGH" warn_on: - "MEDIUM" ignore: - "LOW" safety: fail_on_cvss_above: 7.0 warn_on_cvss_above: 4.0

Create CI/CD pipeline integration script

This script integrates security scanning into your CI/CD pipeline, providing exit codes and detailed reporting for automated deployments.

#!/bin/bash

CI/CD Security Check Script for Airflow DAGs

set -euo pipefail

Configuration

DAG_DIR="${DAG_DIR:-/opt/airflow/dags}" CONFIG_DIR="${CONFIG_DIR:-/opt/airflow}" REPORT_DIR="${REPORT_DIR:-/opt/airflow/logs/security}" FAIL_ON_ISSUES="${FAIL_ON_ISSUES:-true}" SLACK_WEBHOOK="${SLACK_WEBHOOK:-}"

Colors for output

RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' NC='\033[0m' # No Color

Create report directory

mkdir -p "$REPORT_DIR" log() { echo -e "[$(date '+%Y-%m-%d %H:%M:%S')] $1" } log_error() { echo -e "${RED}[ERROR]${NC} $1" >&2 } log_warning() { echo -e "${YELLOW}[WARNING]${NC} $1" } log_success() { echo -e "${GREEN}[SUCCESS]${NC} $1" } send_slack_notification() { local message="$1" local color="$2" if [[ -n "$SLACK_WEBHOOK" ]]; then curl -X POST -H 'Content-type: application/json' \ --data "{\"attachments\":[{\"color\":\"$color\",\"text\":\"$message\"}]}" \ "$SLACK_WEBHOOK" || log_warning "Failed to send Slack notification" fi } run_security_scan() { log "Starting Airflow DAG security scan" local report_file="$REPORT_DIR/security-scan-$(date +%Y%m%d-%H%M%S).json" local exit_code=0 # Activate virtual environment source /opt/airflow-security/bin/activate # Run the security scanner if python3 "$CONFIG_DIR/scripts/scan_dag_security.py" \ --dag-dir "$DAG_DIR" \ --config-dir "$CONFIG_DIR" \ --report "$report_file"; then log_success "Security scan completed successfully" send_slack_notification "✅ Airflow DAG security scan passed" "good" else exit_code=1 log_error "Security scan found issues" # Parse and display summary if [[ -f "$report_file" ]]; then local bandit_issues=$(jq -r '.scans.bandit.issues_found // 0' "$report_file") local safety_vulns=$(jq -r '.scans.safety.vulnerabilities_found // 0' "$report_file") log_error "Summary: $bandit_issues Bandit issues, $safety_vulns Safety vulnerabilities" send_slack_notification "❌ Airflow DAG security scan failed: $bandit_issues Bandit issues, $safety_vulns vulnerabilities" "danger" # Display detailed issues if [[ $bandit_issues -gt 0 ]]; then log "Bandit Issues:" jq -r '.scans.bandit.output.results[]? | "- \(.test_name): \(.issue_text) (\(.filename):\(.line_number))"' "$report_file" || true fi if [[ $safety_vulns -gt 0 ]]; then log "Safety Vulnerabilities:" jq -r '.scans.safety.output[]? | "- \(.package): \(.vulnerability) (\(.id))"' "$report_file" || true fi fi fi # Cleanup old reports (keep last 10) find "$REPORT_DIR" -name "security-scan-*.json" -type f | sort | head -n -10 | xargs rm -f return $exit_code } validate_dag_files() { log "Validating DAG file syntax" local dag_errors=0 while IFS= read -r -d '' dag_file; do if ! python3 -m py_compile "$dag_file"; then log_error "Syntax error in $dag_file" ((dag_errors++)) fi done < <(find "$DAG_DIR" -name "*.py" -print0) if [[ $dag_errors -eq 0 ]]; then log_success "All DAG files have valid syntax" else log_error "Found $dag_errors DAG files with syntax errors" return 1 fi } main() { log "Starting CI/CD security check for Airflow DAGs" local overall_exit=0 # Validate DAG syntax first if ! validate_dag_files; then overall_exit=1 fi # Run security scans if ! run_security_scan; then overall_exit=1 fi if [[ $overall_exit -eq 0 ]]; then log_success "All security checks passed" else log_error "Security checks failed" if [[ "$FAIL_ON_ISSUES" == "true" ]]; then log "Failing build due to security issues (FAIL_ON_ISSUES=true)" exit 1 else log_warning "Continuing despite security issues (FAIL_ON_ISSUES=false)" fi fi } main "$@"

Make CI script executable and test

Set proper permissions and run an initial test of the security scanning pipeline.

chmod +x /opt/airflow/scripts/ci_security_check.sh
chown airflow:airflow /opt/airflow/scripts/ci_security_check.sh

Configure systemd timer for scheduled scans

Set up automated daily security scans using systemd timers to continuously monitor your DAG security.

[Unit]
Description=Airflow DAG Security Scan
After=network.target

[Service]
Type=oneshot
User=airflow
Group=airflow
WorkingDirectory=/opt/airflow
Environment=PYTHONPATH=/opt/airflow
ExecStart=/opt/airflow/scripts/ci_security_check.sh
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

Configure systemd timer

Create the timer configuration to run security scans daily at 2 AM.

[Unit]
Description=Run Airflow DAG Security Scan Daily
Requires=airflow-security-scan.service

[Timer]
OnCalendar=daily
Persistent=true
RandomizedDelaySec=300

[Install]
WantedBy=timers.target

Enable and start the security scan timer

Enable the systemd timer to run automated daily security scans.

sudo systemctl daemon-reload
sudo systemctl enable --now airflow-security-scan.timer
sudo systemctl status airflow-security-scan.timer

Verify your setup

Test all components of your security scanning setup to ensure they're working correctly.

# Test the security scanner directly
cd /opt/airflow
source /opt/airflow-security/bin/activate
python3 scripts/scan_dag_security.py --dag-dir dags/ --report logs/test-scan.json

Test pre-commit hooks

pre-commit run --all-files

Check if the systemd timer is running

sudo systemctl list-timers airflow-security-scan.timer

Manual run of CI security check

./scripts/ci_security_check.sh

Verify Bandit configuration

bandit --config-file bandit.yaml --help

Check Safety policy

safety check --policy-file .safety-policy.json --short-report

Configure advanced security rules

Create DAG-specific security patterns

Add custom security patterns that are specific to common Airflow DAG vulnerabilities. This helps by providing comprehensive DAG security coverage.

# Custom security patterns for Airflow DAGs
patterns:
  high_risk:
    - pattern: "os\.system\("
      message: "Avoid os.system() in DAGs, use BashOperator or subprocess with shell=False"
      severity: "HIGH"
    
    - pattern: "eval\("
      message: "eval() function can execute arbitrary code"
      severity: "HIGH"
    
    - pattern: "exec\("
      message: "exec() function can execute arbitrary code"
      severity: "HIGH"
    
    - pattern: "Variable\.get\([^,],\sNone\)"
      message: "Airflow Variable without default value may cause DAG failures"
      severity: "MEDIUM"
  
  medium_risk:
    - pattern: "password\s=\s['\"][^'\"]+['\"]"
      message: "Hardcoded password detected, use Airflow Connections or Variables"
      severity: "MEDIUM"
    
    - pattern: "api_key\s=\s['\"][^'\"]+['\"]"
      message: "Hardcoded API key detected, use Airflow Connections"
      severity: "MEDIUM"
    
    - pattern: "subprocess\.(call|run|check_output)\([^)]*shell=True"
      message: "subprocess with shell=True may be vulnerable to injection"
      severity: "MEDIUM"
  
  airflow_specific:
    - pattern: "PythonOperator\([^)]*python_callable=eval"
      message: "PythonOperator with eval is dangerous"
      severity: "HIGH"
    
    - pattern: "BashOperator\([^)]bash_command=.\{\{"
      message: "BashOperator with Jinja templating needs validation"
      severity: "MEDIUM"
    
    - pattern: "conn\.get_password\(\)"
      message: "Direct password access - ensure proper error handling"
      severity: "LOW"

Common issues

SymptomCauseFix
Bandit fails with "No module named bandit"Virtual environment not activatedsource /opt/airflow-security/bin/activate
Safety scan times outNetwork connectivity issuesCheck firewall rules: curl -I https://pypi.org
Pre-commit hooks not runningHooks not installed in repositorycd /opt/airflow && pre-commit install
Permission denied on log filesWrong ownership on log directorychown -R airflow:airflow /opt/airflow/logs
Custom Bandit plugins not loadingPYTHONPATH not set correctlyexport PYTHONPATH=/opt/airflow/security_plugins:$PYTHONPATH
Systemd timer not runningTimer not enabledsudo systemctl enable --now airflow-security-scan.timer

Integration with existing workflows

This security scanning setup integrates well with Airflow DAG version control workflows and can be enhanced with comprehensive data governance policies for complete security coverage.

Next steps

Running this in production?

Want this handled for you? Setting this up once is straightforward. Keeping it patched, monitored, backed up and tuned across environments is the harder part. See how we run infrastructure like this for European SaaS and e-commerce teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle infrastructure security hardening for businesses that depend on uptime. From initial setup to ongoing operations.