Set up ModSecurity 3 with machine learning anomaly detection to automatically identify and block unknown attack patterns. This advanced configuration adds intelligent threat protection beyond traditional signature-based rules.
Prerequisites
- Apache web server installed
- Python 3 with pip
- Root or sudo access
- Basic understanding of HTTP requests
- Familiarity with machine learning concepts
What this solves
ModSecurity's machine learning anomaly detection analyzes HTTP request patterns to identify suspicious behavior that doesn't match known attack signatures. This approach catches zero-day exploits, custom attack vectors, and sophisticated threats that bypass traditional WAF rules. You'll configure automated scoring, threshold-based blocking, and adaptive learning to protect web applications from evolving threats.
Step-by-step installation
Install ModSecurity 3 with machine learning dependencies
Install ModSecurity 3 along with Python machine learning libraries and Apache connector for anomaly detection capabilities.
sudo apt update
sudo apt install -y apache2 apache2-dev libmodsecurity3 libmodsecurity-dev modsecurity-crs
sudo apt install -y python3 python3-pip python3-venv libapache2-mod-security3
sudo pip3 install numpy scipy scikit-learn pandas
Enable ModSecurity Apache module
Enable the ModSecurity module and verify it's loaded correctly in Apache.
sudo a2enmod security3
sudo a2enmod unique_id
sudo systemctl restart apache2
sudo apache2ctl -M | grep security
Create ModSecurity configuration directory
Set up the directory structure for ModSecurity configuration files and machine learning models.
sudo mkdir -p /etc/modsecurity
sudo mkdir -p /etc/modsecurity/models
sudo mkdir -p /var/log/modsecurity
sudo mkdir -p /var/lib/modsecurity/data
sudo chown -R www-data:www-data /var/log/modsecurity /var/lib/modsecurity
Configure base ModSecurity settings
Create the main ModSecurity configuration file with anomaly detection engine enabled.
# ModSecurity Core Configuration
SecRuleEngine On
SecRequestBodyAccess On
SecResponseBodyAccess On
SecRequestBodyLimit 13107200
SecRequestBodyNoFilesLimit 131072
SecResponseBodyLimit 524288
Anomaly Detection Configuration
SecAction "id:900001,phase:1,nolog,pass,t:none,setvar:tx.anomaly_score_threshold=5"
SecAction "id:900002,phase:1,nolog,pass,t:none,setvar:tx.inbound_anomaly_score_threshold=5"
SecAction "id:900003,phase:1,nolog,pass,t:none,setvar:tx.outbound_anomaly_score_threshold=4"
Machine Learning Integration
SecAction "id:900010,phase:1,nolog,pass,t:none,setvar:tx.ml_enabled=1"
SecAction "id:900011,phase:1,nolog,pass,t:none,setvar:tx.ml_model_path=/etc/modsecurity/models"
Logging Configuration
SecAuditEngine RelevantOnly
SecAuditLog /var/log/modsecurity/audit.log
SecAuditLogParts ABDEFHIJZ
SecAuditLogType Serial
Debug and Learning Mode
SecDebugLog /var/log/modsecurity/debug.log
SecDebugLogLevel 3
Collection timeout
SecCollectionTimeout 600
Create machine learning anomaly detection script
Build a Python script that analyzes request patterns and generates anomaly scores for ModSecurity.
#!/usr/bin/env python3
import sys
import json
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import pickle
import os
from datetime import datetime
class ModSecurityMLDetector:
def __init__(self, model_path='/etc/modsecurity/models'):
self.model_path = model_path
self.isolation_forest = None
self.scaler = None
self.vectorizer = None
self.load_or_create_models()
def load_or_create_models(self):
"""Load existing models or create new ones"""
iso_path = os.path.join(self.model_path, 'isolation_forest.pkl')
scaler_path = os.path.join(self.model_path, 'scaler.pkl')
vectorizer_path = os.path.join(self.model_path, 'vectorizer.pkl')
try:
with open(iso_path, 'rb') as f:
self.isolation_forest = pickle.load(f)
with open(scaler_path, 'rb') as f:
self.scaler = pickle.load(f)
with open(vectorizer_path, 'rb') as f:
self.vectorizer = pickle.load(f)
except FileNotFoundError:
# Create new models with default parameters
self.isolation_forest = IsolationForest(contamination=0.1, random_state=42)
self.scaler = StandardScaler()
self.vectorizer = TfidfVectorizer(max_features=1000, ngram_range=(1,2))
def extract_features(self, request_data):
"""Extract numerical and text features from HTTP request"""
features = []
# Basic request metrics
features.append(len(request_data.get('uri', '')))
features.append(len(request_data.get('query_string', '')))
features.append(len(request_data.get('request_body', '')))
features.append(len(request_data.get('headers', {})))
# Character distribution anomalies
uri = request_data.get('uri', '')
features.append(uri.count('/'))
features.append(uri.count('.'))
features.append(uri.count('?'))
features.append(uri.count('&'))
features.append(uri.count('%'))
# Entropy calculation for randomness detection
if uri:
entropy = self.calculate_entropy(uri)
features.append(entropy)
else:
features.append(0)
return np.array(features).reshape(1, -1)
def calculate_entropy(self, text):
"""Calculate Shannon entropy of text"""
if not text:
return 0
prob = [text.count(c) / len(text) for c in set(text)]
entropy = -sum(p * np.log2(p) for p in prob if p > 0)
return entropy
def detect_anomaly(self, request_data):
"""Main anomaly detection function"""
try:
# Extract numerical features
numerical_features = self.extract_features(request_data)
# Extract text features
text_data = ' '.join([
request_data.get('uri', ''),
request_data.get('query_string', ''),
request_data.get('request_body', '')[:1000] # Limit body size
])
# Check if models need training (first run)
if not hasattr(self.isolation_forest, 'decision_function'):
# Use current request as baseline (in production, train on clean data)
scaled_features = self.scaler.fit_transform(numerical_features)
text_features = self.vectorizer.fit_transform([text_data])
combined_features = np.hstack([scaled_features, text_features.toarray()])
self.isolation_forest.fit(combined_features)
self.save_models()
return {'anomaly_score': 0, 'is_anomaly': False}
# Transform features
scaled_features = self.scaler.transform(numerical_features)
text_features = self.vectorizer.transform([text_data])
combined_features = np.hstack([scaled_features, text_features.toarray()])
# Get anomaly score
anomaly_score = self.isolation_forest.decision_function(combined_features)[0]
is_anomaly = self.isolation_forest.predict(combined_features)[0] == -1
# Convert to 0-10 scale for ModSecurity
normalized_score = max(0, min(10, (1 - anomaly_score) * 5))
return {
'anomaly_score': normalized_score,
'is_anomaly': is_anomaly,
'raw_score': anomaly_score
}
except Exception as e:
return {'anomaly_score': 0, 'is_anomaly': False, 'error': str(e)}
def save_models(self):
"""Save trained models to disk"""
os.makedirs(self.model_path, exist_ok=True)
with open(os.path.join(self.model_path, 'isolation_forest.pkl'), 'wb') as f:
pickle.dump(self.isolation_forest, f)
with open(os.path.join(self.model_path, 'scaler.pkl'), 'wb') as f:
pickle.dump(self.scaler, f)
with open(os.path.join(self.model_path, 'vectorizer.pkl'), 'wb') as f:
pickle.dump(self.vectorizer, f)
def main():
if len(sys.argv) < 2:
print(json.dumps({'error': 'No request data provided'}))
return
try:
request_data = json.loads(sys.argv[1])
detector = ModSecurityMLDetector()
result = detector.detect_anomaly(request_data)
print(json.dumps(result))
except Exception as e:
print(json.dumps({'error': str(e), 'anomaly_score': 0}))
if __name__ == '__main__':
main()
Make the ML script executable
Set proper permissions for the machine learning detection script.
sudo chmod +x /etc/modsecurity/ml_detector.py
sudo chown www-data:www-data /etc/modsecurity/ml_detector.py
Create ModSecurity ML integration rules
Configure ModSecurity rules that call the machine learning script and act on anomaly scores.
# Machine Learning Anomaly Detection Rules
Phase 1: Initialize ML variables
SecRule REQUEST_METHOD "@unconditionalMatch" \
"id:100001,\
phase:1,\
nolog,\
pass,\
t:none,\
setvar:'tx.ml_request_data={\"uri\":\"%{REQUEST_URI}\",\"query_string\":\"%{QUERY_STRING}\",\"method\":\"%{REQUEST_METHOD}\",\"headers\":{\"user_agent\":\"%{REQUEST_HEADERS.User-Agent}\",\"host\":\"%{REQUEST_HEADERS.Host}\"},\"request_body\":\"%{REQUEST_BODY}\"}'"
Phase 2: Execute ML detection
SecRule REQUEST_METHOD "@unconditionalMatch" \
"id:100002,\
phase:2,\
pass,\
t:none,\
exec:/etc/modsecurity/ml_detector.py '%{tx.ml_request_data}',\
setvar:'tx.ml_result=%{EXEC}'"
Phase 2: Parse ML results and set anomaly score
SecRule TX:ml_result "@rx \"anomaly_score\":\s*(\d+(?:\.\d+)?)" \
"id:100003,\
phase:2,\
capture,\
pass,\
t:none,\
setvar:'tx.ml_anomaly_score=%{tx.1}',\
setvar:'tx.anomaly_score=+%{tx.1}',\
logdata:'ML Anomaly Score: %{tx.ml_anomaly_score}'"
Phase 2: High anomaly score detection
SecRule TX:ml_anomaly_score "@gt 7" \
"id:100004,\
phase:2,\
block,\
msg:'High ML anomaly score detected',\
logdata:'ML Score: %{tx.ml_anomaly_score}, Request: %{tx.ml_request_data}',\
setvar:'tx.anomaly_score=+5'"
Phase 2: Medium anomaly score detection
SecRule TX:ml_anomaly_score "@ge 5" \
"id:100005,\
phase:2,\
pass,\
msg:'Medium ML anomaly score detected',\
logdata:'ML Score: %{tx.ml_anomaly_score}',\
setvar:'tx.anomaly_score=+3'"
Phase 5: Block based on cumulative anomaly score
SecRule TX:ANOMALY_SCORE "@ge %{tx.inbound_anomaly_score_threshold}" \
"id:100006,\
phase:2,\
block,\
msg:'Inbound Anomaly Score Exceeded (Total Score: %{tx.anomaly_score})',\
logdata:'Total anomaly score: %{tx.anomaly_score}, ML contribution: %{tx.ml_anomaly_score}'"
Learning mode rule - log but don't block
SecRule TX:ml_anomaly_score "@gt 8" \
"id:100007,\
phase:5,\
pass,\
msg:'ML Learning Mode - High Anomaly Detected',\
logdata:'Learning: ML Score %{tx.ml_anomaly_score}, URI: %{REQUEST_URI}, IP: %{REMOTE_ADDR}'"
Configure Apache virtual host with ModSecurity
Set up Apache virtual host to use ModSecurity with machine learning enabled.
ServerName example.com
DocumentRoot /var/www/html
# SSL Configuration
SSLEngine on
SSLCertificateFile /etc/ssl/certs/ssl-cert-snakeoil.pem
SSLCertificateKeyFile /etc/ssl/private/ssl-cert-snakeoil.key
# ModSecurity Configuration
ModSecurity3Engine On
ModSecurity3RulesFile /etc/modsecurity/modsecurity.conf
ModSecurity3RulesFile /etc/modsecurity/ml_rules.conf
ModSecurity3RulesFile /usr/share/modsecurity-crs/crs-setup.conf
ModSecurity3RulesFile /usr/share/modsecurity-crs/rules/*.conf
# Logging
ErrorLog ${APACHE_LOG_DIR}/ssl_error.log
CustomLog ${APACHE_LOG_DIR}/ssl_access.log combined
# Security Headers
Header always set X-Content-Type-Options nosniff
Header always set X-Frame-Options DENY
Header always set X-XSS-Protection "1; mode=block"
Create automated threat response script
Build a script that monitors ModSecurity logs and automatically responds to threats.
#!/usr/bin/env python3
import re
import json
import subprocess
import time
from datetime import datetime, timedelta
from collections import defaultdict
import os
class ThreatResponseSystem:
def __init__(self):
self.log_file = '/var/log/modsecurity/audit.log'
self.blocked_ips = set()
self.threat_counts = defaultdict(int)
self.last_check = datetime.now()
def parse_modsec_log(self, log_line):
"""Parse ModSecurity audit log entries"""
try:
# Extract relevant information from audit log
ip_match = re.search(r'"client_ip":"([^"]+)"', log_line)
score_match = re.search(r'ML Score: ([\d.]+)', log_line)
rule_match = re.search(r'id "(\d+)"', log_line)
if ip_match:
return {
'ip': ip_match.group(1),
'ml_score': float(score_match.group(1)) if score_match else 0,
'rule_id': rule_match.group(1) if rule_match else None,
'timestamp': datetime.now()
}
except Exception as e:
print(f"Error parsing log: {e}")
return None
def should_block_ip(self, ip, ml_score):
"""Determine if IP should be blocked based on threat score and frequency"""
self.threat_counts[ip] += 1
# Block criteria
if ml_score > 8: # Very high anomaly score
return True
if self.threat_counts[ip] > 5 and ml_score > 6: # Repeated medium threats
return True
if self.threat_counts[ip] > 10: # Too many requests total
return True
return False
def block_ip(self, ip):
"""Block IP using iptables"""
if ip in self.blocked_ips:
return False
try:
# Add iptables rule
subprocess.run([
'iptables', '-I', 'INPUT', '-s', ip, '-j', 'DROP'
], check=True)
self.blocked_ips.add(ip)
print(f"Blocked IP: {ip}")
# Log the action
with open('/var/log/modsecurity/threat_response.log', 'a') as f:
f.write(f"{datetime.now().isoformat()} - BLOCKED {ip}\n")
return True
except subprocess.CalledProcessError as e:
print(f"Failed to block IP {ip}: {e}")
return False
def unblock_ip(self, ip):
"""Remove IP from blocked list after timeout"""
try:
subprocess.run([
'iptables', '-D', 'INPUT', '-s', ip, '-j', 'DROP'
], check=True)
if ip in self.blocked_ips:
self.blocked_ips.remove(ip)
print(f"Unblocked IP: {ip}")
return True
except subprocess.CalledProcessError:
return False
def cleanup_old_blocks(self):
"""Remove IP blocks older than 1 hour"""
# This is a simplified version - in production, track block timestamps
cutoff_time = datetime.now() - timedelta(hours=1)
# Clean up threat counts older than 24 hours
if datetime.now() - self.last_check > timedelta(hours=24):
self.threat_counts.clear()
self.last_check = datetime.now()
def monitor_logs(self):
"""Main monitoring loop"""
print("Starting threat response monitoring...")
if not os.path.exists(self.log_file):
print(f"Log file {self.log_file} not found")
return
# Follow log file
subprocess.Popen(['touch', self.log_file])
with open(self.log_file, 'r') as f:
# Go to end of file
f.seek(0, 2)
while True:
line = f.readline()
if line:
parsed = self.parse_modsec_log(line)
if parsed and parsed['ml_score'] > 5:
ip = parsed['ip']
score = parsed['ml_score']
print(f"Threat detected: IP {ip}, ML Score: {score}")
if self.should_block_ip(ip, score):
self.block_ip(ip)
else:
time.sleep(1)
self.cleanup_old_blocks()
if __name__ == '__main__':
try:
response_system = ThreatResponseSystem()
response_system.monitor_logs()
except KeyboardInterrupt:
print("\nThreat response monitoring stopped.")
except Exception as e:
print(f"Error: {e}")
Set up threat response service
Create systemd service for automated threat response and enable it.
[Unit]
Description=ModSecurity Threat Response System
After=network.target apache2.service
Requires=apache2.service
[Service]
Type=simple
User=root
ExecStart=/usr/bin/python3 /etc/modsecurity/threat_response.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
Enable and start services
Enable all services and start ModSecurity with machine learning protection.
sudo chmod +x /etc/modsecurity/threat_response.py
sudo systemctl daemon-reload
sudo systemctl enable modsec-threat-response
sudo a2ensite 000-default-ssl
sudo a2enmod ssl
sudo systemctl restart apache2
sudo systemctl start modsec-threat-response
Create ML model training script
Build a script to train the machine learning models on clean traffic data.
#!/usr/bin/env python3
import json
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import pickle
import os
from datetime import datetime
def train_models_from_clean_data(clean_log_file, model_path='/etc/modsecurity/models'):
"""Train ML models using clean traffic data"""
print("Training ML models from clean data...")
training_data = []
# Read clean traffic logs
try:
with open(clean_log_file, 'r') as f:
for line in f:
try:
data = json.loads(line)
training_data.append(data)
except json.JSONDecodeError:
continue
except FileNotFoundError:
print(f"Clean data file {clean_log_file} not found")
return False
if len(training_data) < 10:
print("Not enough training data (minimum 10 samples)")
return False
# Extract features
numerical_features = []
text_data = []
for data in training_data:
# Extract numerical features
features = [
len(data.get('uri', '')),
len(data.get('query_string', '')),
len(data.get('request_body', '')),
len(data.get('headers', {})),
data.get('uri', '').count('/'),
data.get('uri', '').count('.'),
data.get('uri', '').count('?'),
data.get('uri', '').count('&'),
data.get('uri', '').count('%'),
calculate_entropy(data.get('uri', ''))
]
numerical_features.append(features)
# Extract text data
text = ' '.join([
data.get('uri', ''),
data.get('query_string', ''),
data.get('request_body', '')[:1000]
])
text_data.append(text)
# Train models
scaler = StandardScaler()
vectorizer = TfidfVectorizer(max_features=1000, ngram_range=(1,2))
isolation_forest = IsolationForest(contamination=0.1, random_state=42)
# Fit transformers
scaled_features = scaler.fit_transform(numerical_features)
text_features = vectorizer.fit_transform(text_data)
# Combine features and train
combined_features = np.hstack([scaled_features, text_features.toarray()])
isolation_forest.fit(combined_features)
# Save models
os.makedirs(model_path, exist_ok=True)
with open(os.path.join(model_path, 'isolation_forest.pkl'), 'wb') as f:
pickle.dump(isolation_forest, f)
with open(os.path.join(model_path, 'scaler.pkl'), 'wb') as f:
pickle.dump(scaler, f)
with open(os.path.join(model_path, 'vectorizer.pkl'), 'wb') as f:
pickle.dump(vectorizer, f)
print(f"Models trained and saved to {model_path}")
print(f"Training data: {len(training_data)} samples")
return True
def calculate_entropy(text):
"""Calculate Shannon entropy"""
if not text:
return 0
prob = [text.count(c) / len(text) for c in set(text)]
return -sum(p * np.log2(p) for p in prob if p > 0)
def create_sample_clean_data():
"""Create sample clean data for initial training"""
clean_data = [
{"uri": "/", "query_string": "", "request_body": "", "headers": {"User-Agent": "Mozilla/5.0"}},
{"uri": "/about", "query_string": "", "request_body": "", "headers": {"User-Agent": "Chrome/91.0"}},
{"uri": "/contact", "query_string": "name=john", "request_body": "", "headers": {}},
{"uri": "/products", "query_string": "category=electronics", "request_body": "", "headers": {}},
{"uri": "/search", "query_string": "q=laptop", "request_body": "", "headers": {}},
{"uri": "/api/users", "query_string": "", "request_body": '{"name":"test"}', "headers": {}},
{"uri": "/login", "query_string": "", "request_body": "username=user&password=pass", "headers": {}},
{"uri": "/css/style.css", "query_string": "", "request_body": "", "headers": {}},
{"uri": "/js/app.js", "query_string": "", "request_body": "", "headers": {}},
{"uri": "/images/logo.png", "query_string": "", "request_body": "", "headers": {}}
]
with open('/var/lib/modsecurity/clean_data.json', 'w') as f:
for data in clean_data:
f.write(json.dumps(data) + '\n')
print("Sample clean data created at /var/lib/modsecurity/clean_data.json")
if __name__ == '__main__':
create_sample_clean_data()
success = train_models_from_clean_data('/var/lib/modsecurity/clean_data.json')
if success:
print("ML models training completed successfully")
else:
print("ML models training failed")
Train initial ML models
Run the model training script to create initial machine learning models.
sudo chmod +x /etc/modsecurity/train_models.py
sudo python3 /etc/modsecurity/train_models.py
sudo chown -R www-data:www-data /etc/modsecurity/models
Configure ML tuning and monitoring
Create ML performance monitoring script
Build monitoring to track machine learning model performance and accuracy.
#!/usr/bin/env python3
import json
import time
import os
from datetime import datetime, timedelta
from collections import defaultdict
import subprocess
class MLPerformanceMonitor:
def __init__(self):
self.stats_file = '/var/log/modsecurity/ml_stats.json'
self.stats = defaultdict(int)
self.false_positives = []
self.load_stats()
def load_stats(self):
"""Load existing statistics"""
try:
with open(self.stats_file, 'r') as f:
data = json.load(f)
self.stats.update(data.get('stats', {}))
self.false_positives = data.get('false_positives', [])
except (FileNotFoundError, json.JSONDecodeError):
pass
def save_stats(self):
"""Save statistics to file"""
data = {
'stats': dict(self.stats),
'false_positives': self.false_positives[-100:], # Keep last 100
'last_updated': datetime.now().isoformat()
}
with open(self.stats_file, 'w') as f:
json.dump(data, f, indent=2)
def analyze_log_entry(self, log_line):
"""Analyze ModSecurity log entries for ML performance"""
if 'ML Score' in log_line:
self.stats['ml_detections'] += 1
# Extract ML score
import re
score_match = re.search(r'ML Score: ([\d.]+)', log_line)
if score_match:
score = float(score_match.group(1))
if score > 8:
self.stats['high_risk_detections'] += 1
elif score > 5:
self.stats['medium_risk_detections'] += 1
else:
self.stats['low_risk_detections'] += 1
if 'blocked' in log_line.lower():
self.stats['blocked_requests'] += 1
def check_model_performance(self):
"""Check if model needs retraining"""
total_detections = self.stats.get('ml_detections', 0)
high_risk = self.stats.get('high_risk_detections', 0)
if total_detections > 0:
high_risk_ratio = high_risk / total_detections
# If too many high-risk detections, model may be too sensitive
if high_risk_ratio > 0.3:
return 'too_sensitive'
# If too few high-risk detections, model may be too lenient
elif high_risk_ratio < 0.05:
return 'too_lenient'
return 'normal'
def generate_report(self):
"""Generate performance report"""
report = {
'timestamp': datetime.now().isoformat(),
'total_ml_detections': self.stats.get('ml_detections', 0),
'high_risk_detections': self.stats.get('high_risk_detections', 0),
'medium_risk_detections': self.stats.get('medium_risk_detections', 0),
'low_risk_detections': self.stats.get('low_risk_detections', 0),
'blocked_requests': self.stats.get('blocked_requests', 0),
'model_status': self.check_model_performance(),
'false_positives_count': len(self.false_positives)
}
return report
def monitor_continuously(self):
"""Main monitoring loop"""
audit_log = '/var/log/modsecurity/audit.log'
if not os.path.exists(audit_log):
print(f"Audit log {audit_log} not found")
return
print("Starting ML performance monitoring...")
with open(audit_log, 'r') as f:
f.seek(0, 2) # Go to end
while True:
line = f.readline()
if line:
self.analyze_log_entry(line)
# Save stats every 100 detections
if self.stats['ml_detections'] % 100 == 0:
self.save_stats()
report = self.generate_report()
print(f"ML Stats: {json.dumps(report, indent=2)}")
else:
time.sleep(5)
# Generate hourly reports
if datetime.now().minute == 0:
self.save_stats()
report = self.generate_report()
with open('/var/log/modsecurity/ml_report.json', 'w') as f:
json.dump(report, f, indent=2)
if __name__ == '__main__':
try:
monitor = MLPerformanceMonitor()
monitor.monitor_continuously()
except KeyboardInterrupt:
print("\nML monitoring stopped.")
Set up model retraining cron job
Create automated model retraining based on new clean traffic data.
sudo chmod +x /etc/modsecurity/ml_monitor.py
sudo crontab -e
Add these entries to retrain models weekly:
# Retrain ML models weekly with accumulated clean data
0 2 0 /usr/bin/python3 /etc/modsecurity/train_models.py >> /var/log/modsecurity/training.log 2>&1
Generate daily ML performance reports
0 6 * /usr/bin/python3 /etc/modsecurity/ml_monitor.py --report >> /var/log/modsecurity/ml_performance.log 2>&1
Verify your setup
Test the machine learning anomaly detection system with various request patterns.
# Check ModSecurity is loaded
sudo apache2ctl -M | grep security
Verify ML models exist
sudo ls -la /etc/modsecurity/models/
Test normal request (should pass)
curl -k https://example.com/
Test suspicious request (should trigger ML detection)
curl -k "https://example.com/admin.php?cmd=ls%20-la&exec=cat%20/etc/passwd"
Check audit logs for ML scoring
sudo tail -f /var/log/modsecurity/audit.log | grep "ML Score"
Verify threat response is running
sudo systemctl status modsec-threat-response
Check blocked IPs
sudo iptables -L INPUT | grep DROP
Test ML detector directly
echo '{"uri":"/test","query_string":"","request_body":"","headers":{}}' | sudo -u www-data python3 /etc/modsecurity/ml_detector.py
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Python import errors | Missing ML dependencies | sudo pip3 install numpy scipy scikit-learn pandas |
| ML models not found | Training script didn't run | sudo python3 /etc/modsecurity/train_models.py |
| Permission denied on models | Wrong file ownership | sudo chown -R www-data:www-data /etc/modsecurity/models |
| Too many false positives | Model too sensitive | Increase threshold in /etc/modsecurity/ml_rules.conf from 5 to 7 |
| Threat response not blocking | Service not running | sudo systemctl start modsec-threat-response |
| No ML scores in logs | Rules not loaded | Check ModSecurity3RulesFile path in virtual host config |
Tune ML detection rules
Fine-tune the machine learning detection based on your traffic patterns and security requirements.
Edit the anomaly thresholds in /etc/modsecurity/ml_rules.conf:
# For stricter security (more blocking)
sudo sed -i 's/@gt 7/@gt 5/g' /etc/modsecurity/ml_rules.conf
For looser security (less false positives)
sudo sed -i 's/@gt 7/@gt 9/g' /etc/modsecurity/ml_rules.conf
Restart Apache to apply changes
sudo systemctl restart apache2
Monitor and adjust the learning rate by collecting more clean training data:
# Extract clean traffic from audit logs (manual review required)
sudo grep -v "anomaly" /var/log/modsecurity/audit.log | head -100 > /var/lib/modsecurity/additional_clean_data.json
Retrain with additional data
sudo python3 /etc/modsecurity/train_models.py
For better integration with your security infrastructure, consider linking to centralized ModSecurity log analysis or SOAR platform integration for automated incident response.
Next steps
- Configure advanced rate limiting and DDoS protection with ModSecurity
- Integrate ML threat data with Elasticsearch for advanced analytics
- Set up ModSecurity cluster with shared ML models
- Build custom ML models for application-specific threat detection
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# ModSecurity ML Anomaly Detection Install Script
# Production-ready installer for automated threat protection
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
# Configuration
DOMAIN="${1:-localhost}"
MODSEC_DIR="/etc/modsecurity"
LOG_DIR="/var/log/modsecurity"
DATA_DIR="/var/lib/modsecurity"
ML_SCRIPT_PATH="/usr/local/bin/modsec_ml_detector.py"
usage() {
echo "Usage: $0 [domain]"
echo "Example: $0 example.com"
exit 1
}
log() {
echo -e "${GREEN}[$(date '+%Y-%m-%d %H:%M:%S')]${NC} $1"
}
warn() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
error() {
echo -e "${RED}[ERROR]${NC} $1" >&2
exit 1
}
cleanup() {
warn "Installation failed. Cleaning up..."
systemctl stop "$WEB_SERVICE" 2>/dev/null || true
rm -f "$ML_SCRIPT_PATH" 2>/dev/null || true
exit 1
}
trap cleanup ERR
check_prereqs() {
log "[1/8] Checking prerequisites..."
if [[ $EUID -ne 0 ]]; then
error "This script must be run as root or with sudo"
fi
if ! command -v python3 &> /dev/null; then
error "Python3 is required but not installed"
fi
if [ ! -f /etc/os-release ]; then
error "Cannot detect operating system"
fi
}
detect_distro() {
log "[2/8] Detecting distribution..."
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update -qq"
PKG_INSTALL="apt install -y"
WEB_SERVICE="apache2"
WEB_USER="www-data"
WEB_CONFIG_DIR="/etc/apache2"
WEB_SITES_DIR="$WEB_CONFIG_DIR/sites-available"
MODSEC_MODULE="libapache2-mod-security3"
MODSEC_LOAD_MODULE="security3"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y -q"
PKG_INSTALL="dnf install -y -q"
WEB_SERVICE="httpd"
WEB_USER="apache"
WEB_CONFIG_DIR="/etc/httpd"
WEB_SITES_DIR="$WEB_CONFIG_DIR/conf.d"
MODSEC_MODULE="modsecurity-apache"
MODSEC_LOAD_MODULE="security3"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y -q"
PKG_INSTALL="yum install -y -q"
WEB_SERVICE="httpd"
WEB_USER="apache"
WEB_CONFIG_DIR="/etc/httpd"
WEB_SITES_DIR="$WEB_CONFIG_DIR/conf.d"
MODSEC_MODULE="modsecurity-apache"
MODSEC_LOAD_MODULE="security3"
;;
*)
error "Unsupported distribution: $ID"
;;
esac
log "Detected: $PRETTY_NAME using $PKG_MGR"
}
install_packages() {
log "[3/8] Installing packages and dependencies..."
$PKG_UPDATE
if [[ "$PKG_MGR" == "apt" ]]; then
$PKG_INSTALL apache2 apache2-dev libmodsecurity3 libmodsecurity-dev modsecurity-crs
$PKG_INSTALL python3 python3-pip python3-venv $MODSEC_MODULE
$PKG_INSTALL python3-numpy python3-scipy python3-sklearn python3-pandas
else
$PKG_INSTALL epel-release
$PKG_INSTALL httpd httpd-devel modsecurity $MODSEC_MODULE python3 python3-pip
$PKG_INSTALL python3-numpy python3-scipy python3-scikit-learn python3-pandas
fi
# Install additional Python packages via pip if system packages are insufficient
pip3 install --quiet --upgrade numpy scipy scikit-learn pandas 2>/dev/null || warn "Some Python packages may need manual installation"
}
configure_apache() {
log "[4/8] Configuring Apache and ModSecurity..."
if [[ "$PKG_MGR" == "apt" ]]; then
a2enmod security3
a2enmod unique_id
a2enmod rewrite
else
# Create module configuration for RHEL-based systems
cat > "$WEB_CONFIG_DIR/conf.modules.d/00-security.conf" << 'EOF'
LoadModule security3_module modules/mod_security3.so
LoadModule unique_id_module modules/mod_unique_id.so
EOF
fi
# Verify module loading
systemctl restart "$WEB_SERVICE"
sleep 2
if [[ "$PKG_MGR" == "apt" ]]; then
apache2ctl -M | grep -q security || error "ModSecurity module failed to load"
else
httpd -M | grep -q security || error "ModSecurity module failed to load"
fi
}
create_directories() {
log "[5/8] Creating directory structure..."
mkdir -p "$MODSEC_DIR"/{rules,models}
mkdir -p "$LOG_DIR"
mkdir -p "$DATA_DIR/data"
# Set proper permissions
chown -R root:root "$MODSEC_DIR"
chmod 755 "$MODSEC_DIR" "$MODSEC_DIR"/{rules,models}
chown -R "$WEB_USER:$WEB_USER" "$LOG_DIR" "$DATA_DIR"
chmod 755 "$LOG_DIR" "$DATA_DIR" "$DATA_DIR/data"
# SELinux context for RHEL-based systems
if command -v setsebool &> /dev/null; then
setsebool -P httpd_can_network_connect 1 2>/dev/null || warn "Could not set SELinux boolean"
restorecon -R "$MODSEC_DIR" "$LOG_DIR" "$DATA_DIR" 2>/dev/null || warn "Could not restore SELinux contexts"
fi
}
create_config() {
log "[6/8] Creating ModSecurity configuration..."
cat > "$MODSEC_DIR/modsecurity.conf" << 'EOF'
# ModSecurity Core Configuration
SecRuleEngine On
SecRequestBodyAccess On
SecResponseBodyAccess On
SecRequestBodyLimit 13107200
SecRequestBodyNoFilesLimit 131072
SecResponseBodyLimit 524288
# Anomaly Detection Configuration
SecAction "id:900001,phase:1,nolog,pass,t:none,setvar:tx.anomaly_score_threshold=5"
SecAction "id:900002,phase:1,nolog,pass,t:none,setvar:tx.inbound_anomaly_score_threshold=5"
SecAction "id:900003,phase:1,nolog,pass,t:none,setvar:tx.outbound_anomaly_score_threshold=4"
# Machine Learning Integration
SecAction "id:900010,phase:1,nolog,pass,t:none,setvar:tx.ml_enabled=1"
SecAction "id:900011,phase:1,nolog,pass,t:none,setvar:tx.ml_model_path=/etc/modsecurity/models"
# Logging Configuration
SecAuditEngine RelevantOnly
SecAuditLog /var/log/modsecurity/audit.log
SecAuditLogParts ABDEFHIJZ
SecAuditLogType Serial
# Debug and Learning Mode
SecDebugLog /var/log/modsecurity/debug.log
SecDebugLogLevel 3
# Collection timeout
SecCollectionTimeout 600
EOF
chmod 644 "$MODSEC_DIR/modsecurity.conf"
}
create_ml_script() {
log "[7/8] Creating machine learning detection script..."
cat > "$ML_SCRIPT_PATH" << 'EOF'
#!/usr/bin/env python3
import sys
import json
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import pickle
import os
import re
from datetime import datetime
class ModSecurityMLDetector:
def __init__(self, model_path='/etc/modsecurity/models'):
self.model_path = model_path
self.isolation_forest = None
self.scaler = None
self.vectorizer = None
self.load_or_create_models()
def load_or_create_models(self):
iso_path = os.path.join(self.model_path, 'isolation_forest.pkl')
scaler_path = os.path.join(self.model_path, 'scaler.pkl')
vectorizer_path = os.path.join(self.model_path, 'vectorizer.pkl')
try:
with open(iso_path, 'rb') as f:
self.isolation_forest = pickle.load(f)
with open(scaler_path, 'rb') as f:
self.scaler = pickle.load(f)
with open(vectorizer_path, 'rb') as f:
self.vectorizer = pickle.load(f)
except FileNotFoundError:
self.create_initial_models()
def create_initial_models(self):
self.isolation_forest = IsolationForest(contamination=0.1, random_state=42)
self.scaler = StandardScaler()
self.vectorizer = TfidfVectorizer(max_features=1000)
# Train with minimal normal data
normal_data = np.random.rand(100, 10)
self.isolation_forest.fit(normal_data)
self.scaler.fit(normal_data)
normal_text = ['GET /index.html', 'POST /login', 'GET /style.css']
self.vectorizer.fit(normal_text)
self.save_models()
def save_models(self):
iso_path = os.path.join(self.model_path, 'isolation_forest.pkl')
scaler_path = os.path.join(self.model_path, 'scaler.pkl')
vectorizer_path = os.path.join(self.model_path, 'vectorizer.pkl')
with open(iso_path, 'wb') as f:
pickle.dump(self.isolation_forest, f)
with open(scaler_path, 'wb') as f:
pickle.dump(self.scaler, f)
with open(vectorizer_path, 'wb') as f:
pickle.dump(self.vectorizer, f)
def extract_features(self, request_data):
features = []
uri = request_data.get('REQUEST_URI', '')
method = request_data.get('REQUEST_METHOD', '')
user_agent = request_data.get('HTTP_USER_AGENT', '')
# Basic numerical features
features.append(len(uri))
features.append(len(user_agent))
features.append(uri.count('/'))
features.append(uri.count('?'))
features.append(uri.count('&'))
features.append(len(re.findall(r'[<>"\']', uri)))
features.append(len(re.findall(r'(union|select|insert|update|delete)', uri.lower())))
features.append(1 if method in ['POST', 'PUT', 'DELETE'] else 0)
features.append(len(request_data.get('QUERY_STRING', '')))
features.append(int(request_data.get('CONTENT_LENGTH', 0) or 0))
return np.array(features).reshape(1, -1)
def detect_anomaly(self, request_data):
features = self.extract_features(request_data)
features_scaled = self.scaler.transform(features)
anomaly_score = self.isolation_forest.decision_function(features_scaled)[0]
is_anomaly = self.isolation_forest.predict(features_scaled)[0] == -1
# Convert to 0-10 scale (higher = more suspicious)
normalized_score = max(0, min(10, (1 - anomaly_score) * 5))
return {
'anomaly_score': float(normalized_score),
'is_anomaly': bool(is_anomaly),
'timestamp': datetime.now().isoformat()
}
if __name__ == "__main__":
if len(sys.argv) < 2:
print(json.dumps({'error': 'No request data provided'}))
sys.exit(1)
try:
request_data = json.loads(sys.argv[1])
detector = ModSecurityMLDetector()
result = detector.detect_anomaly(request_data)
print(json.dumps(result))
except Exception as e:
print(json.dumps({'error': str(e)}))
sys.exit(1)
EOF
chmod 755 "$ML_SCRIPT_PATH"
chown root:root "$ML_SCRIPT_PATH"
}
configure_virtualhost() {
log "[8/8] Configuring virtual host..."
if [[ "$PKG_MGR" == "apt" ]]; then
SITE_CONFIG="$WEB_SITES_DIR/000-default.conf"
cat > "$SITE_CONFIG" << EOF
<VirtualHost *:80>
ServerName $DOMAIN
DocumentRoot /var/www/html
SecRuleEngine On
Include $MODSEC_DIR/modsecurity.conf
ErrorLog \${APACHE_LOG_DIR}/error.log
CustomLog \${APACHE_LOG_DIR}/access.log combined
</VirtualHost>
EOF
a2ensite 000-default
else
SITE_CONFIG="$WEB_SITES_DIR/modsecurity.conf"
cat > "$SITE_CONFIG" << EOF
<VirtualHost *:80>
ServerName $DOMAIN
DocumentRoot /var/www/html
SecRuleEngine On
Include $MODSEC_DIR/modsecurity.conf
ErrorLog logs/error_log
CustomLog logs/access_log combined
</VirtualHost>
EOF
fi
chmod 644 "$SITE_CONFIG"
# Test configuration and restart
if [[ "$PKG_MGR" == "apt" ]]; then
apache2ctl configtest
else
httpd -t
fi
systemctl restart "$WEB_SERVICE"
systemctl enable "$WEB_SERVICE"
}
verify_installation() {
Review the script before running. Execute with: bash install.sh