Set up automated security scanning for Apache Airflow DAGs using Bandit for Python code analysis and Safety for vulnerability detection. Configure pre-commit hooks and CI/CD integration for continuous security monitoring.
Prerequisites
- Apache Airflow installation
- Python 3.8 or later
- Git repository for DAGs
- Basic Linux system administration
What this solves
Apache Airflow DAGs often contain sensitive operations like database connections, API calls, and data processing logic that can introduce security vulnerabilities. This tutorial implements automated security scanning using Bandit for Python security issues and Safety for known vulnerability detection, ensuring your DAGs meet security standards before deployment.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you get the latest versions of Python and security tools.
sudo apt update && sudo apt upgrade -y
sudo apt install -y python3-pip python3-venv git
Install Bandit and Safety security scanners
Install Bandit for Python security scanning and Safety for vulnerability detection. Create a virtual environment to isolate the security tools.
python3 -m venv /opt/airflow-security
source /opt/airflow-security/bin/activate
pip install bandit[toml] safety pre-commit gitpython
Create Bandit configuration file
Configure Bandit with custom rules for Airflow DAGs. This configuration excludes test files and focuses on DAG-specific security issues.
[bandit]
exclude_dirs = ["/opt/airflow/logs", "/opt/airflow/plugins/__pycache__"]
skips = ["B101", "B601"]
[bandit.assert_used]
skips = ["_test.py", "/test_*.py"]
[bandit.hardcoded_password_string]
word_list = ["password", "pass", "passwd", "pwd", "secret", "token"]
[bandit.hardcoded_password_funcarg]
word_list = ["password", "pass", "passwd", "pwd", "secret", "token"]
[bandit.hardcoded_password_default]
word_list = ["password", "pass", "passwd", "pwd", "secret", "token"]
[bandit.sql_injection]
sql_statements = ["select", "insert", "update", "delete", "create", "drop"]
qualname_patterns = [".execute", ".executemany", "*.cursor"]
[bandit.shell_injection]
shell_wraps = ["os.system", "os.popen", "subprocess.call", "subprocess.run"]
subprocess_without_shell_equals_true = [
"subprocess.Popen",
"subprocess.call",
"subprocess.check_call",
"subprocess.check_output",
"subprocess.run"
]
Create Safety configuration
Configure Safety to check for known vulnerabilities in Python dependencies. The policy file defines security thresholds and reporting preferences.
{
"security": {
"ignore-cvss-severity-below": 7.0,
"ignore-cvss-unknown-severity": false,
"continue-on-vulnerability-error": false
},
"alert": {
"ignore-unpinned-requirements": false
},
"report": {
"only-report": false,
"output": {
"format": "json",
"file": "/opt/airflow/logs/safety-report.json"
}
},
"ignore": {
"vulnerabilities": [],
"packages": []
}
}
Create DAG security scanning script
Create an automated script that scans all DAG files for security issues using both Bandit and Safety.
#!/usr/bin/env python3
"""Airflow DAG Security Scanner"""
import os
import sys
import json
import subprocess
import argparse
from pathlib import Path
from datetime import datetime
class DAGSecurityScanner:
def __init__(self, dag_dir, config_dir):
self.dag_dir = Path(dag_dir)
self.config_dir = Path(config_dir)
self.bandit_config = self.config_dir / "bandit.yaml"
self.safety_policy = self.config_dir / ".safety-policy.json"
self.results = {"timestamp": datetime.now().isoformat(), "scans": {}}
def run_bandit_scan(self):
"""Run Bandit security scan on DAG files"""
print("Running Bandit security scan...")
cmd = [
"bandit",
"-r", str(self.dag_dir),
"-f", "json",
"-c", str(self.bandit_config),
"-ll", # Only report medium and high severity
"-i" # Show confidence levels
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if result.returncode == 0:
self.results["scans"]["bandit"] = {
"status": "clean",
"issues_found": 0,
"output": json.loads(result.stdout) if result.stdout else {}
}
print("✓ Bandit scan completed - no issues found")
else:
issues_data = json.loads(result.stdout) if result.stdout else {}
issue_count = len(issues_data.get("results", []))
self.results["scans"]["bandit"] = {
"status": "issues_found",
"issues_found": issue_count,
"output": issues_data
}
print(f"⚠ Bandit scan found {issue_count} security issues")
return False
except Exception as e:
print(f"✗ Bandit scan failed: {e}")
self.results["scans"]["bandit"] = {"status": "error", "error": str(e)}
return False
return True
def run_safety_scan(self):
"""Run Safety vulnerability scan"""
print("Running Safety vulnerability scan...")
cmd = [
"safety", "check",
"--json",
"--policy-file", str(self.safety_policy)
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if result.returncode == 0:
self.results["scans"]["safety"] = {
"status": "clean",
"vulnerabilities_found": 0,
"output": json.loads(result.stdout) if result.stdout else []
}
print("✓ Safety scan completed - no vulnerabilities found")
else:
vulns_data = json.loads(result.stdout) if result.stdout else []
vuln_count = len(vulns_data)
self.results["scans"]["safety"] = {
"status": "vulnerabilities_found",
"vulnerabilities_found": vuln_count,
"output": vulns_data
}
print(f"⚠ Safety scan found {vuln_count} vulnerabilities")
return False
except Exception as e:
print(f"✗ Safety scan failed: {e}")
self.results["scans"]["safety"] = {"status": "error", "error": str(e)}
return False
return True
def generate_report(self, output_file=None):
"""Generate security scan report"""
if output_file:
with open(output_file, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"Report saved to {output_file}")
else:
print(json.dumps(self.results, indent=2))
def scan_all(self, report_file=None):
"""Run all security scans"""
print(f"Starting security scan of DAGs in {self.dag_dir}")
bandit_ok = self.run_bandit_scan()
safety_ok = self.run_safety_scan()
self.generate_report(report_file)
if bandit_ok and safety_ok:
print("\n✓ All security scans passed")
return True
else:
print("\n✗ Security issues found - check report for details")
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scan Airflow DAGs for security issues")
parser.add_argument("--dag-dir", default="/opt/airflow/dags", help="DAG directory path")
parser.add_argument("--config-dir", default="/opt/airflow", help="Configuration directory")
parser.add_argument("--report", help="Output report file path")
args = parser.parse_args()
scanner = DAGSecurityScanner(args.dag_dir, args.config_dir)
success = scanner.scan_all(args.report)
sys.exit(0 if success else 1)
Make the scanner executable and create log directory
Set proper permissions and create required directories for security scanning logs.
chmod +x /opt/airflow/scripts/scan_dag_security.py
mkdir -p /opt/airflow/logs/security
chown -R airflow:airflow /opt/airflow/scripts /opt/airflow/logs/security
Configure pre-commit hooks
Set up pre-commit hooks to automatically run security scans before commits. This prevents insecure code from entering your repository.
repos:
- repo: local
hooks:
- id: bandit-security-scan
name: Bandit Security Scan
entry: /opt/airflow-security/bin/bandit
language: system
args: [-r, dags/, -f, json, -c, bandit.yaml, -ll]
files: \.py$
exclude: __pycache__|test_.*\.py$
- id: safety-vulnerability-scan
name: Safety Vulnerability Scan
entry: /opt/airflow-security/bin/safety
language: system
args: [check, --json, --policy-file, .safety-policy.json]
pass_filenames: false
- id: dag-security-validation
name: DAG Security Validation
entry: python3
language: system
args: [/opt/airflow/scripts/scan_dag_security.py, --dag-dir, dags/]
files: dags/.*\.py$
pass_filenames: false
- repo: https://github.com/psf/black
rev: 23.12.1
hooks:
- id: black
files: dags/.*\.py$
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
- id: flake8
files: dags/.*\.py$
args: [--max-line-length=88, --extend-ignore=E203,W503]
Install pre-commit hooks
Initialize pre-commit in your Airflow repository to enable automatic security scanning.
cd /opt/airflow
source /opt/airflow-security/bin/activate
pre-commit install
pre-commit run --all-files
Create custom security rules
Add custom Bandit plugins for Airflow-specific security checks. This file creates rules for common DAG security issues.
"""Custom Bandit plugins for Airflow DAG security"""
import ast
from bandit.core import test_properties
from bandit.core import utils
@test_properties.test_id('B901')
@test_properties.checks('Call')
def check_airflow_variable_plaintext(context):
"""Check for plaintext Airflow Variables instead of encrypted ones"""
if context.call_function_name_qual == 'Variable.get':
args = context.node.args
keywords = context.node.keywords
# Check if deserialize_json=True is used (potential security issue)
for kw in keywords:
if (kw.arg == 'deserialize_json' and
isinstance(kw.value, ast.Constant) and kw.value.value is True):
return bandit.Issue(
severity=bandit.MEDIUM,
confidence=bandit.HIGH,
text="Airflow Variable with deserialize_json=True may expose secrets",
lineno=context.node.lineno
)
return None
@test_properties.test_id('B902')
@test_properties.checks('Call')
def check_airflow_connection_password(context):
"""Check for hardcoded passwords in Airflow Connections"""
if context.call_function_name_qual in ['Connection', 'BaseHook.get_connection']:
keywords = context.node.keywords
for kw in keywords:
if kw.arg == 'password' and isinstance(kw.value, ast.Constant):
if isinstance(kw.value.value, str) and len(kw.value.value) > 0:
return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.HIGH,
text="Hardcoded password in Airflow Connection",
lineno=context.node.lineno
)
return None
@test_properties.test_id('B903')
@test_properties.checks('Call')
def check_bash_operator_injection(context):
"""Check for potential command injection in BashOperator"""
if 'BashOperator' in context.call_function_name_qual:
keywords = context.node.keywords
for kw in keywords:
if kw.arg == 'bash_command':
if isinstance(kw.value, ast.JoinedStr): # f-string
return bandit.Issue(
severity=bandit.MEDIUM,
confidence=bandit.MEDIUM,
text="BashOperator with f-string may be vulnerable to injection",
lineno=context.node.lineno
)
elif (isinstance(kw.value, ast.BinOp) and
isinstance(kw.value.op, ast.Mod)): # % formatting
return bandit.Issue(
severity=bandit.MEDIUM,
confidence=bandit.MEDIUM,
text="BashOperator with % formatting may be vulnerable to injection",
lineno=context.node.lineno
)
return None
Configure custom security exceptions
Create a configuration file for managing security exceptions and false positives specific to your DAG patterns.
# Security scanning exceptions for Airflow DAGs
exceptions:
bandit:
# Skip assert_used in test DAGs
B101:
files:
- "**/test_*.py"
- "**/*_test.py"
reason: "Assert statements acceptable in test files"
# Skip hardcoded_password_string for specific patterns
B105:
patterns:
- "default_password = None"
- "password = ''"
reason: "Default/empty password values are acceptable"
# Skip subprocess_popen_with_shell_equals_true for specific cases
B602:
files:
- "dags/legacy/*.py"
reason: "Legacy DAGs with approved shell usage"
safety:
# Ignore specific CVEs that don't affect our usage
ignore_vulnerabilities:
- "51668" # Example: ignore specific CVE if not applicable
# Ignore packages we can't upgrade due to Airflow constraints
ignore_packages:
- "apache-airflow" # Managed separately
Severity thresholds
severity_levels:
bandit:
fail_on:
- "HIGH"
warn_on:
- "MEDIUM"
ignore:
- "LOW"
safety:
fail_on_cvss_above: 7.0
warn_on_cvss_above: 4.0
Create CI/CD pipeline integration script
This script integrates security scanning into your CI/CD pipeline, providing exit codes and detailed reporting for automated deployments.
#!/bin/bash
CI/CD Security Check Script for Airflow DAGs
set -euo pipefail
Configuration
DAG_DIR="${DAG_DIR:-/opt/airflow/dags}"
CONFIG_DIR="${CONFIG_DIR:-/opt/airflow}"
REPORT_DIR="${REPORT_DIR:-/opt/airflow/logs/security}"
FAIL_ON_ISSUES="${FAIL_ON_ISSUES:-true}"
SLACK_WEBHOOK="${SLACK_WEBHOOK:-}"
Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
Create report directory
mkdir -p "$REPORT_DIR"
log() {
echo -e "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1" >&2
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
send_slack_notification() {
local message="$1"
local color="$2"
if [[ -n "$SLACK_WEBHOOK" ]]; then
curl -X POST -H 'Content-type: application/json' \
--data "{\"attachments\":[{\"color\":\"$color\",\"text\":\"$message\"}]}" \
"$SLACK_WEBHOOK" || log_warning "Failed to send Slack notification"
fi
}
run_security_scan() {
log "Starting Airflow DAG security scan"
local report_file="$REPORT_DIR/security-scan-$(date +%Y%m%d-%H%M%S).json"
local exit_code=0
# Activate virtual environment
source /opt/airflow-security/bin/activate
# Run the security scanner
if python3 "$CONFIG_DIR/scripts/scan_dag_security.py" \
--dag-dir "$DAG_DIR" \
--config-dir "$CONFIG_DIR" \
--report "$report_file"; then
log_success "Security scan completed successfully"
send_slack_notification "✅ Airflow DAG security scan passed" "good"
else
exit_code=1
log_error "Security scan found issues"
# Parse and display summary
if [[ -f "$report_file" ]]; then
local bandit_issues=$(jq -r '.scans.bandit.issues_found // 0' "$report_file")
local safety_vulns=$(jq -r '.scans.safety.vulnerabilities_found // 0' "$report_file")
log_error "Summary: $bandit_issues Bandit issues, $safety_vulns Safety vulnerabilities"
send_slack_notification "❌ Airflow DAG security scan failed: $bandit_issues Bandit issues, $safety_vulns vulnerabilities" "danger"
# Display detailed issues
if [[ $bandit_issues -gt 0 ]]; then
log "Bandit Issues:"
jq -r '.scans.bandit.output.results[]? | "- \(.test_name): \(.issue_text) (\(.filename):\(.line_number))"' "$report_file" || true
fi
if [[ $safety_vulns -gt 0 ]]; then
log "Safety Vulnerabilities:"
jq -r '.scans.safety.output[]? | "- \(.package): \(.vulnerability) (\(.id))"' "$report_file" || true
fi
fi
fi
# Cleanup old reports (keep last 10)
find "$REPORT_DIR" -name "security-scan-*.json" -type f | sort | head -n -10 | xargs rm -f
return $exit_code
}
validate_dag_files() {
log "Validating DAG file syntax"
local dag_errors=0
while IFS= read -r -d '' dag_file; do
if ! python3 -m py_compile "$dag_file"; then
log_error "Syntax error in $dag_file"
((dag_errors++))
fi
done < <(find "$DAG_DIR" -name "*.py" -print0)
if [[ $dag_errors -eq 0 ]]; then
log_success "All DAG files have valid syntax"
else
log_error "Found $dag_errors DAG files with syntax errors"
return 1
fi
}
main() {
log "Starting CI/CD security check for Airflow DAGs"
local overall_exit=0
# Validate DAG syntax first
if ! validate_dag_files; then
overall_exit=1
fi
# Run security scans
if ! run_security_scan; then
overall_exit=1
fi
if [[ $overall_exit -eq 0 ]]; then
log_success "All security checks passed"
else
log_error "Security checks failed"
if [[ "$FAIL_ON_ISSUES" == "true" ]]; then
log "Failing build due to security issues (FAIL_ON_ISSUES=true)"
exit 1
else
log_warning "Continuing despite security issues (FAIL_ON_ISSUES=false)"
fi
fi
}
main "$@"
Make CI script executable and test
Set proper permissions and run an initial test of the security scanning pipeline.
chmod +x /opt/airflow/scripts/ci_security_check.sh
chown airflow:airflow /opt/airflow/scripts/ci_security_check.sh
Configure systemd timer for scheduled scans
Set up automated daily security scans using systemd timers to continuously monitor your DAG security.
[Unit]
Description=Airflow DAG Security Scan
After=network.target
[Service]
Type=oneshot
User=airflow
Group=airflow
WorkingDirectory=/opt/airflow
Environment=PYTHONPATH=/opt/airflow
ExecStart=/opt/airflow/scripts/ci_security_check.sh
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
Configure systemd timer
Create the timer configuration to run security scans daily at 2 AM.
[Unit]
Description=Run Airflow DAG Security Scan Daily
Requires=airflow-security-scan.service
[Timer]
OnCalendar=daily
Persistent=true
RandomizedDelaySec=300
[Install]
WantedBy=timers.target
Enable and start the security scan timer
Enable the systemd timer to run automated daily security scans.
sudo systemctl daemon-reload
sudo systemctl enable --now airflow-security-scan.timer
sudo systemctl status airflow-security-scan.timer
Verify your setup
Test all components of your security scanning setup to ensure they're working correctly.
# Test the security scanner directly
cd /opt/airflow
source /opt/airflow-security/bin/activate
python3 scripts/scan_dag_security.py --dag-dir dags/ --report logs/test-scan.json
Test pre-commit hooks
pre-commit run --all-files
Check if the systemd timer is running
sudo systemctl list-timers airflow-security-scan.timer
Manual run of CI security check
./scripts/ci_security_check.sh
Verify Bandit configuration
bandit --config-file bandit.yaml --help
Check Safety policy
safety check --policy-file .safety-policy.json --short-report
Configure advanced security rules
Create DAG-specific security patterns
Add custom security patterns that are specific to common Airflow DAG vulnerabilities. This helps by providing comprehensive DAG security coverage.
# Custom security patterns for Airflow DAGs
patterns:
high_risk:
- pattern: "os\.system\("
message: "Avoid os.system() in DAGs, use BashOperator or subprocess with shell=False"
severity: "HIGH"
- pattern: "eval\("
message: "eval() function can execute arbitrary code"
severity: "HIGH"
- pattern: "exec\("
message: "exec() function can execute arbitrary code"
severity: "HIGH"
- pattern: "Variable\.get\([^,],\sNone\)"
message: "Airflow Variable without default value may cause DAG failures"
severity: "MEDIUM"
medium_risk:
- pattern: "password\s=\s['\"][^'\"]+['\"]"
message: "Hardcoded password detected, use Airflow Connections or Variables"
severity: "MEDIUM"
- pattern: "api_key\s=\s['\"][^'\"]+['\"]"
message: "Hardcoded API key detected, use Airflow Connections"
severity: "MEDIUM"
- pattern: "subprocess\.(call|run|check_output)\([^)]*shell=True"
message: "subprocess with shell=True may be vulnerable to injection"
severity: "MEDIUM"
airflow_specific:
- pattern: "PythonOperator\([^)]*python_callable=eval"
message: "PythonOperator with eval is dangerous"
severity: "HIGH"
- pattern: "BashOperator\([^)]bash_command=.\{\{"
message: "BashOperator with Jinja templating needs validation"
severity: "MEDIUM"
- pattern: "conn\.get_password\(\)"
message: "Direct password access - ensure proper error handling"
severity: "LOW"
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Bandit fails with "No module named bandit" | Virtual environment not activated | source /opt/airflow-security/bin/activate |
| Safety scan times out | Network connectivity issues | Check firewall rules: curl -I https://pypi.org |
| Pre-commit hooks not running | Hooks not installed in repository | cd /opt/airflow && pre-commit install |
| Permission denied on log files | Wrong ownership on log directory | chown -R airflow:airflow /opt/airflow/logs |
| Custom Bandit plugins not loading | PYTHONPATH not set correctly | export PYTHONPATH=/opt/airflow/security_plugins:$PYTHONPATH |
| Systemd timer not running | Timer not enabled | sudo systemctl enable --now airflow-security-scan.timer |
Integration with existing workflows
This security scanning setup integrates well with Airflow DAG version control workflows and can be enhanced with comprehensive data governance policies for complete security coverage.
Next steps
- Configure Airflow DAG security and secrets management
- Implement comprehensive Airflow DAG testing strategies
- Set up Airflow security monitoring with ELK stack
- Configure Airflow RBAC with LDAP authentication
- Implement Airflow DAG vulnerability management workflows
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Default values
AIRFLOW_HOME="${AIRFLOW_HOME:-/opt/airflow}"
DAG_DIR="${AIRFLOW_HOME}/dags"
CONFIG_DIR="/etc/airflow-security"
SECURITY_VENV="/opt/airflow-security"
SECURITY_USER="airflow"
# Usage message
usage() {
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " -h, --help Show this help message"
echo " -d, --dag-dir PATH Airflow DAGs directory (default: ${DAG_DIR})"
echo " -u, --user USER User to run security scans (default: ${SECURITY_USER})"
echo " --airflow-home PATH Airflow home directory (default: ${AIRFLOW_HOME})"
exit 1
}
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
-h|--help)
usage
;;
-d|--dag-dir)
DAG_DIR="$2"
shift 2
;;
-u|--user)
SECURITY_USER="$2"
shift 2
;;
--airflow-home)
AIRFLOW_HOME="$2"
DAG_DIR="${AIRFLOW_HOME}/dags"
shift 2
;;
*)
echo -e "${RED}Error: Unknown option $1${NC}"
usage
;;
esac
done
# Error handler for cleanup
cleanup() {
echo -e "${RED}Installation failed. Cleaning up...${NC}"
rm -rf "${SECURITY_VENV}" "${CONFIG_DIR}" /usr/local/bin/airflow-security-scan
exit 1
}
trap cleanup ERR
# Check if running as root or with sudo
if [[ $EUID -ne 0 ]]; then
echo -e "${RED}Error: This script must be run as root or with sudo${NC}"
exit 1
fi
echo -e "${BLUE}Airflow DAG Security Scanner Installation${NC}"
echo "=============================================="
# Auto-detect distribution
echo -e "${YELLOW}[1/8] Detecting Linux distribution...${NC}"
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update && apt upgrade -y"
PKG_INSTALL="apt install -y"
PYTHON_PKG="python3-pip python3-venv python3-dev"
;;
almalinux|rocky|centos|rhel|ol)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
PYTHON_PKG="python3-pip python3-virtualenv python3-devel"
;;
fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
PYTHON_PKG="python3-pip python3-virtualenv python3-devel"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
PYTHON_PKG="python3-pip python3-virtualenv python3-devel"
;;
*)
echo -e "${RED}Error: Unsupported distribution: $ID${NC}"
exit 1
;;
esac
echo -e "${GREEN}Detected: $PRETTY_NAME${NC}"
else
echo -e "${RED}Error: Cannot detect Linux distribution${NC}"
exit 1
fi
# Update system packages
echo -e "${YELLOW}[2/8] Updating system packages...${NC}"
$PKG_UPDATE
$PKG_INSTALL $PYTHON_PKG git curl
# Create security user if it doesn't exist
echo -e "${YELLOW}[3/8] Creating security user...${NC}"
if ! id "$SECURITY_USER" &>/dev/null; then
useradd -r -s /bin/bash -d /var/lib/airflow "$SECURITY_USER"
fi
# Create directories
echo -e "${YELLOW}[4/8] Creating directories...${NC}"
mkdir -p "$SECURITY_VENV" "$CONFIG_DIR" "$AIRFLOW_HOME/logs"
chown -R "$SECURITY_USER:$SECURITY_USER" "$SECURITY_VENV" "$AIRFLOW_HOME"
# Install security tools in virtual environment
echo -e "${YELLOW}[5/8] Installing security tools...${NC}"
sudo -u "$SECURITY_USER" python3 -m venv "$SECURITY_VENV"
sudo -u "$SECURITY_USER" bash -c "source $SECURITY_VENV/bin/activate && pip install --upgrade pip"
sudo -u "$SECURITY_USER" bash -c "source $SECURITY_VENV/bin/activate && pip install bandit[toml] safety pre-commit gitpython"
# Create Bandit configuration
echo -e "${YELLOW}[6/8] Creating Bandit configuration...${NC}"
cat > "$CONFIG_DIR/bandit.yaml" << 'EOF'
exclude_dirs:
- '/opt/airflow/logs'
- '/opt/airflow/plugins/__pycache__'
skips:
- 'B101'
- 'B601'
assert_used:
skips:
- '_test.py'
- '/test_*.py'
hardcoded_password_string:
word_list:
- 'password'
- 'pass'
- 'passwd'
- 'pwd'
- 'secret'
- 'token'
sql_injection:
sql_statements:
- 'select'
- 'insert'
- 'update'
- 'delete'
- 'create'
- 'drop'
shell_injection:
shell_wraps:
- 'os.system'
- 'os.popen'
- 'subprocess.call'
- 'subprocess.run'
EOF
# Create Safety policy configuration
cat > "$CONFIG_DIR/.safety-policy.json" << 'EOF'
{
"security": {
"ignore-cvss-severity-below": 7.0,
"ignore-cvss-unknown-severity": false,
"continue-on-vulnerability-error": false
},
"alert": {
"ignore-unpinned-requirements": false
},
"report": {
"only-report": false,
"output": {
"format": "json"
}
},
"ignore": {
"vulnerabilities": [],
"packages": []
}
}
EOF
# Create security scanner script
echo -e "${YELLOW}[7/8] Creating security scanner script...${NC}"
cat > /usr/local/bin/airflow-security-scan << 'EOF'
#!/usr/bin/env python3
"""Airflow DAG Security Scanner"""
import os
import sys
import json
import subprocess
import argparse
from pathlib import Path
from datetime import datetime
class DAGSecurityScanner:
def __init__(self, dag_dir, config_dir):
self.dag_dir = Path(dag_dir)
self.config_dir = Path(config_dir)
self.bandit_config = self.config_dir / "bandit.yaml"
self.safety_policy = self.config_dir / ".safety-policy.json"
self.results = {"timestamp": datetime.now().isoformat(), "scans": {}}
def run_bandit_scan(self):
"""Run Bandit security scan on DAG files"""
print("Running Bandit security scan...")
cmd = [
"/opt/airflow-security/bin/bandit",
"-r", str(self.dag_dir),
"-f", "json",
"-c", str(self.bandit_config),
"-ll"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
self.results["scans"]["bandit"] = {
"status": "completed",
"return_code": result.returncode,
"issues_found": result.returncode > 0
}
if result.stdout:
bandit_data = json.loads(result.stdout)
self.results["scans"]["bandit"]["results"] = bandit_data
return result.returncode == 0
except Exception as e:
self.results["scans"]["bandit"] = {
"status": "failed",
"error": str(e)
}
return False
def run_safety_scan(self):
"""Run Safety vulnerability scan"""
print("Running Safety vulnerability scan...")
cmd = [
"/opt/airflow-security/bin/safety",
"check",
"--json",
"--policy-file", str(self.safety_policy)
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
self.results["scans"]["safety"] = {
"status": "completed",
"return_code": result.returncode,
"vulnerabilities_found": result.returncode > 0
}
if result.stdout:
try:
safety_data = json.loads(result.stdout)
self.results["scans"]["safety"]["results"] = safety_data
except json.JSONDecodeError:
self.results["scans"]["safety"]["raw_output"] = result.stdout
return result.returncode == 0
except Exception as e:
self.results["scans"]["safety"] = {
"status": "failed",
"error": str(e)
}
return False
def generate_report(self, output_file=None):
"""Generate security scan report"""
if output_file:
with open(output_file, 'w') as f:
json.dump(self.results, f, indent=2)
else:
print(json.dumps(self.results, indent=2))
def main():
parser = argparse.ArgumentParser(description="Airflow DAG Security Scanner")
parser.add_argument("--dag-dir", default="/opt/airflow/dags", help="DAG directory path")
parser.add_argument("--config-dir", default="/etc/airflow-security", help="Config directory path")
parser.add_argument("--output", help="Output file for results")
parser.add_argument("--fail-on-issues", action="store_true", help="Exit with error if issues found")
args = parser.parse_args()
scanner = DAGSecurityScanner(args.dag_dir, args.config_dir)
bandit_ok = scanner.run_bandit_scan()
safety_ok = scanner.run_safety_scan()
scanner.generate_report(args.output)
if args.fail_on_issues and (not bandit_ok or not safety_ok):
sys.exit(1)
if __name__ == "__main__":
main()
EOF
chmod 755 /usr/local/bin/airflow-security-scan
chown root:root /usr/local/bin/airflow-security-scan
# Set proper permissions
chmod 755 "$CONFIG_DIR"
chmod 644 "$CONFIG_DIR"/*
chown -R root:root "$CONFIG_DIR"
# Verify installation
echo -e "${YELLOW}[8/8] Verifying installation...${NC}"
# Check virtual environment
if [[ ! -f "$SECURITY_VENV/bin/bandit" ]]; then
echo -e "${RED}Error: Bandit not found in virtual environment${NC}"
exit 1
fi
if [[ ! -f "$SECURITY_VENV/bin/safety" ]]; then
echo -e "${RED}Error: Safety not found in virtual environment${NC}"
exit 1
fi
# Test the scanner script
if ! python3 /usr/local/bin/airflow-security-scan --help >/dev/null 2>&1; then
echo -e "${RED}Error: Security scanner script is not working${NC}"
exit 1
fi
echo -e "${GREEN}✓ Installation completed successfully!${NC}"
echo ""
echo -e "${BLUE}Usage Examples:${NC}"
echo " airflow-security-scan --dag-dir $DAG_DIR"
echo " airflow-security-scan --dag-dir $DAG_DIR --output /tmp/security-report.json"
echo " airflow-security-scan --fail-on-issues"
echo ""
echo -e "${BLUE}Configuration files:${NC}"
echo " Bandit config: $CONFIG_DIR/bandit.yaml"
echo " Safety policy: $CONFIG_DIR/.safety-policy.json"
echo ""
echo -e "${YELLOW}Note: Run security scans as the '$SECURITY_USER' user for proper permissions${NC}"
Review the script before running. Execute with: bash install.sh