Configure ModSecurity machine learning anomaly detection for automated threat protection

Advanced 45 min Apr 30, 2026 62 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up ModSecurity 3 with machine learning anomaly detection to automatically identify and block unknown attack patterns. This advanced configuration adds intelligent threat protection beyond traditional signature-based rules.

Prerequisites

  • Apache web server installed
  • Python 3 with pip
  • Root or sudo access
  • Basic understanding of HTTP requests
  • Familiarity with machine learning concepts

What this solves

ModSecurity's machine learning anomaly detection analyzes HTTP request patterns to identify suspicious behavior that doesn't match known attack signatures. This approach catches zero-day exploits, custom attack vectors, and sophisticated threats that bypass traditional WAF rules. You'll configure automated scoring, threshold-based blocking, and adaptive learning to protect web applications from evolving threats.

Step-by-step installation

Install ModSecurity 3 with machine learning dependencies

Install ModSecurity 3 along with Python machine learning libraries and Apache connector for anomaly detection capabilities.

sudo apt update
sudo apt install -y apache2 apache2-dev libmodsecurity3 libmodsecurity-dev modsecurity-crs
sudo apt install -y python3 python3-pip python3-venv libapache2-mod-security3
sudo pip3 install numpy scipy scikit-learn pandas
sudo dnf update -y
sudo dnf install -y httpd httpd-devel modsecurity modsecurity-apache python3 python3-pip
sudo dnf install -y epel-release
sudo dnf install -y python3-numpy python3-scipy python3-scikit-learn python3-pandas

Enable ModSecurity Apache module

Enable the ModSecurity module and verify it's loaded correctly in Apache.

sudo a2enmod security3
sudo a2enmod unique_id
sudo systemctl restart apache2
sudo apache2ctl -M | grep security
echo "LoadModule security3_module modules/mod_security3.so" | sudo tee /etc/httpd/conf.modules.d/00-security.conf
sudo systemctl restart httpd
sudo httpd -M | grep security

Create ModSecurity configuration directory

Set up the directory structure for ModSecurity configuration files and machine learning models.

sudo mkdir -p /etc/modsecurity
sudo mkdir -p /etc/modsecurity/models
sudo mkdir -p /var/log/modsecurity
sudo mkdir -p /var/lib/modsecurity/data
sudo chown -R www-data:www-data /var/log/modsecurity /var/lib/modsecurity

Configure base ModSecurity settings

Create the main ModSecurity configuration file with anomaly detection engine enabled.

# ModSecurity Core Configuration
SecRuleEngine On
SecRequestBodyAccess On
SecResponseBodyAccess On
SecRequestBodyLimit 13107200
SecRequestBodyNoFilesLimit 131072
SecResponseBodyLimit 524288

Anomaly Detection Configuration

SecAction "id:900001,phase:1,nolog,pass,t:none,setvar:tx.anomaly_score_threshold=5" SecAction "id:900002,phase:1,nolog,pass,t:none,setvar:tx.inbound_anomaly_score_threshold=5" SecAction "id:900003,phase:1,nolog,pass,t:none,setvar:tx.outbound_anomaly_score_threshold=4"

Machine Learning Integration

SecAction "id:900010,phase:1,nolog,pass,t:none,setvar:tx.ml_enabled=1" SecAction "id:900011,phase:1,nolog,pass,t:none,setvar:tx.ml_model_path=/etc/modsecurity/models"

Logging Configuration

SecAuditEngine RelevantOnly SecAuditLog /var/log/modsecurity/audit.log SecAuditLogParts ABDEFHIJZ SecAuditLogType Serial

Debug and Learning Mode

SecDebugLog /var/log/modsecurity/debug.log SecDebugLogLevel 3

Collection timeout

SecCollectionTimeout 600

Create machine learning anomaly detection script

Build a Python script that analyzes request patterns and generates anomaly scores for ModSecurity.

#!/usr/bin/env python3
import sys
import json
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import pickle
import os
from datetime import datetime

class ModSecurityMLDetector:
    def __init__(self, model_path='/etc/modsecurity/models'):
        self.model_path = model_path
        self.isolation_forest = None
        self.scaler = None
        self.vectorizer = None
        self.load_or_create_models()
    
    def load_or_create_models(self):
        """Load existing models or create new ones"""
        iso_path = os.path.join(self.model_path, 'isolation_forest.pkl')
        scaler_path = os.path.join(self.model_path, 'scaler.pkl')
        vectorizer_path = os.path.join(self.model_path, 'vectorizer.pkl')
        
        try:
            with open(iso_path, 'rb') as f:
                self.isolation_forest = pickle.load(f)
            with open(scaler_path, 'rb') as f:
                self.scaler = pickle.load(f)
            with open(vectorizer_path, 'rb') as f:
                self.vectorizer = pickle.load(f)
        except FileNotFoundError:
            # Create new models with default parameters
            self.isolation_forest = IsolationForest(contamination=0.1, random_state=42)
            self.scaler = StandardScaler()
            self.vectorizer = TfidfVectorizer(max_features=1000, ngram_range=(1,2))
    
    def extract_features(self, request_data):
        """Extract numerical and text features from HTTP request"""
        features = []
        
        # Basic request metrics
        features.append(len(request_data.get('uri', '')))
        features.append(len(request_data.get('query_string', '')))
        features.append(len(request_data.get('request_body', '')))
        features.append(len(request_data.get('headers', {})))
        
        # Character distribution anomalies
        uri = request_data.get('uri', '')
        features.append(uri.count('/'))
        features.append(uri.count('.'))
        features.append(uri.count('?'))
        features.append(uri.count('&'))
        features.append(uri.count('%'))
        
        # Entropy calculation for randomness detection
        if uri:
            entropy = self.calculate_entropy(uri)
            features.append(entropy)
        else:
            features.append(0)
            
        return np.array(features).reshape(1, -1)
    
    def calculate_entropy(self, text):
        """Calculate Shannon entropy of text"""
        if not text:
            return 0
        prob = [text.count(c) / len(text) for c in set(text)]
        entropy = -sum(p * np.log2(p) for p in prob if p > 0)
        return entropy
    
    def detect_anomaly(self, request_data):
        """Main anomaly detection function"""
        try:
            # Extract numerical features
            numerical_features = self.extract_features(request_data)
            
            # Extract text features
            text_data = ' '.join([
                request_data.get('uri', ''),
                request_data.get('query_string', ''),
                request_data.get('request_body', '')[:1000]  # Limit body size
            ])
            
            # Check if models need training (first run)
            if not hasattr(self.isolation_forest, 'decision_function'):
                # Use current request as baseline (in production, train on clean data)
                scaled_features = self.scaler.fit_transform(numerical_features)
                text_features = self.vectorizer.fit_transform([text_data])
                combined_features = np.hstack([scaled_features, text_features.toarray()])
                self.isolation_forest.fit(combined_features)
                self.save_models()
                return {'anomaly_score': 0, 'is_anomaly': False}
            
            # Transform features
            scaled_features = self.scaler.transform(numerical_features)
            text_features = self.vectorizer.transform([text_data])
            combined_features = np.hstack([scaled_features, text_features.toarray()])
            
            # Get anomaly score
            anomaly_score = self.isolation_forest.decision_function(combined_features)[0]
            is_anomaly = self.isolation_forest.predict(combined_features)[0] == -1
            
            # Convert to 0-10 scale for ModSecurity
            normalized_score = max(0, min(10, (1 - anomaly_score) * 5))
            
            return {
                'anomaly_score': normalized_score,
                'is_anomaly': is_anomaly,
                'raw_score': anomaly_score
            }
            
        except Exception as e:
            return {'anomaly_score': 0, 'is_anomaly': False, 'error': str(e)}
    
    def save_models(self):
        """Save trained models to disk"""
        os.makedirs(self.model_path, exist_ok=True)
        with open(os.path.join(self.model_path, 'isolation_forest.pkl'), 'wb') as f:
            pickle.dump(self.isolation_forest, f)
        with open(os.path.join(self.model_path, 'scaler.pkl'), 'wb') as f:
            pickle.dump(self.scaler, f)
        with open(os.path.join(self.model_path, 'vectorizer.pkl'), 'wb') as f:
            pickle.dump(self.vectorizer, f)

def main():
    if len(sys.argv) < 2:
        print(json.dumps({'error': 'No request data provided'}))
        return
    
    try:
        request_data = json.loads(sys.argv[1])
        detector = ModSecurityMLDetector()
        result = detector.detect_anomaly(request_data)
        print(json.dumps(result))
    except Exception as e:
        print(json.dumps({'error': str(e), 'anomaly_score': 0}))

if __name__ == '__main__':
    main()

Make the ML script executable

Set proper permissions for the machine learning detection script.

sudo chmod +x /etc/modsecurity/ml_detector.py
sudo chown www-data:www-data /etc/modsecurity/ml_detector.py

Create ModSecurity ML integration rules

Configure ModSecurity rules that call the machine learning script and act on anomaly scores.

# Machine Learning Anomaly Detection Rules

Phase 1: Initialize ML variables

SecRule REQUEST_METHOD "@unconditionalMatch" \ "id:100001,\ phase:1,\ nolog,\ pass,\ t:none,\ setvar:'tx.ml_request_data={\"uri\":\"%{REQUEST_URI}\",\"query_string\":\"%{QUERY_STRING}\",\"method\":\"%{REQUEST_METHOD}\",\"headers\":{\"user_agent\":\"%{REQUEST_HEADERS.User-Agent}\",\"host\":\"%{REQUEST_HEADERS.Host}\"},\"request_body\":\"%{REQUEST_BODY}\"}'"

Phase 2: Execute ML detection

SecRule REQUEST_METHOD "@unconditionalMatch" \ "id:100002,\ phase:2,\ pass,\ t:none,\ exec:/etc/modsecurity/ml_detector.py '%{tx.ml_request_data}',\ setvar:'tx.ml_result=%{EXEC}'"

Phase 2: Parse ML results and set anomaly score

SecRule TX:ml_result "@rx \"anomaly_score\":\s*(\d+(?:\.\d+)?)" \ "id:100003,\ phase:2,\ capture,\ pass,\ t:none,\ setvar:'tx.ml_anomaly_score=%{tx.1}',\ setvar:'tx.anomaly_score=+%{tx.1}',\ logdata:'ML Anomaly Score: %{tx.ml_anomaly_score}'"

Phase 2: High anomaly score detection

SecRule TX:ml_anomaly_score "@gt 7" \ "id:100004,\ phase:2,\ block,\ msg:'High ML anomaly score detected',\ logdata:'ML Score: %{tx.ml_anomaly_score}, Request: %{tx.ml_request_data}',\ setvar:'tx.anomaly_score=+5'"

Phase 2: Medium anomaly score detection

SecRule TX:ml_anomaly_score "@ge 5" \ "id:100005,\ phase:2,\ pass,\ msg:'Medium ML anomaly score detected',\ logdata:'ML Score: %{tx.ml_anomaly_score}',\ setvar:'tx.anomaly_score=+3'"

Phase 5: Block based on cumulative anomaly score

SecRule TX:ANOMALY_SCORE "@ge %{tx.inbound_anomaly_score_threshold}" \ "id:100006,\ phase:2,\ block,\ msg:'Inbound Anomaly Score Exceeded (Total Score: %{tx.anomaly_score})',\ logdata:'Total anomaly score: %{tx.anomaly_score}, ML contribution: %{tx.ml_anomaly_score}'"

Learning mode rule - log but don't block

SecRule TX:ml_anomaly_score "@gt 8" \ "id:100007,\ phase:5,\ pass,\ msg:'ML Learning Mode - High Anomaly Detected',\ logdata:'Learning: ML Score %{tx.ml_anomaly_score}, URI: %{REQUEST_URI}, IP: %{REMOTE_ADDR}'"

Configure Apache virtual host with ModSecurity

Set up Apache virtual host to use ModSecurity with machine learning enabled.


    ServerName example.com
    DocumentRoot /var/www/html
    
    # SSL Configuration
    SSLEngine on
    SSLCertificateFile /etc/ssl/certs/ssl-cert-snakeoil.pem
    SSLCertificateKeyFile /etc/ssl/private/ssl-cert-snakeoil.key
    
    # ModSecurity Configuration
    ModSecurity3Engine On
    ModSecurity3RulesFile /etc/modsecurity/modsecurity.conf
    ModSecurity3RulesFile /etc/modsecurity/ml_rules.conf
    ModSecurity3RulesFile /usr/share/modsecurity-crs/crs-setup.conf
    ModSecurity3RulesFile /usr/share/modsecurity-crs/rules/*.conf
    
    # Logging
    ErrorLog ${APACHE_LOG_DIR}/ssl_error.log
    CustomLog ${APACHE_LOG_DIR}/ssl_access.log combined
    
    # Security Headers
    Header always set X-Content-Type-Options nosniff
    Header always set X-Frame-Options DENY
    Header always set X-XSS-Protection "1; mode=block"

    ServerName example.com
    DocumentRoot /var/www/html
    
    # SSL Configuration
    SSLEngine on
    SSLCertificateFile /etc/pki/tls/certs/localhost.crt
    SSLCertificateKeyFile /etc/pki/tls/private/localhost.key
    
    # ModSecurity Configuration
    
        ModSecurity3Engine On
        ModSecurity3RulesFile /etc/modsecurity/modsecurity.conf
        ModSecurity3RulesFile /etc/modsecurity/ml_rules.conf
    
    
    # Logging
    ErrorLog logs/ssl_error_log
    TransferLog logs/ssl_access_log
    LogLevel warn

Create automated threat response script

Build a script that monitors ModSecurity logs and automatically responds to threats.

#!/usr/bin/env python3
import re
import json
import subprocess
import time
from datetime import datetime, timedelta
from collections import defaultdict
import os

class ThreatResponseSystem:
    def __init__(self):
        self.log_file = '/var/log/modsecurity/audit.log'
        self.blocked_ips = set()
        self.threat_counts = defaultdict(int)
        self.last_check = datetime.now()
        
    def parse_modsec_log(self, log_line):
        """Parse ModSecurity audit log entries"""
        try:
            # Extract relevant information from audit log
            ip_match = re.search(r'"client_ip":"([^"]+)"', log_line)
            score_match = re.search(r'ML Score: ([\d.]+)', log_line)
            rule_match = re.search(r'id "(\d+)"', log_line)
            
            if ip_match:
                return {
                    'ip': ip_match.group(1),
                    'ml_score': float(score_match.group(1)) if score_match else 0,
                    'rule_id': rule_match.group(1) if rule_match else None,
                    'timestamp': datetime.now()
                }
        except Exception as e:
            print(f"Error parsing log: {e}")
        return None
    
    def should_block_ip(self, ip, ml_score):
        """Determine if IP should be blocked based on threat score and frequency"""
        self.threat_counts[ip] += 1
        
        # Block criteria
        if ml_score > 8:  # Very high anomaly score
            return True
        if self.threat_counts[ip] > 5 and ml_score > 6:  # Repeated medium threats
            return True
        if self.threat_counts[ip] > 10:  # Too many requests total
            return True
            
        return False
    
    def block_ip(self, ip):
        """Block IP using iptables"""
        if ip in self.blocked_ips:
            return False
            
        try:
            # Add iptables rule
            subprocess.run([
                'iptables', '-I', 'INPUT', '-s', ip, '-j', 'DROP'
            ], check=True)
            
            self.blocked_ips.add(ip)
            print(f"Blocked IP: {ip}")
            
            # Log the action
            with open('/var/log/modsecurity/threat_response.log', 'a') as f:
                f.write(f"{datetime.now().isoformat()} - BLOCKED {ip}\n")
                
            return True
        except subprocess.CalledProcessError as e:
            print(f"Failed to block IP {ip}: {e}")
            return False
    
    def unblock_ip(self, ip):
        """Remove IP from blocked list after timeout"""
        try:
            subprocess.run([
                'iptables', '-D', 'INPUT', '-s', ip, '-j', 'DROP'
            ], check=True)
            
            if ip in self.blocked_ips:
                self.blocked_ips.remove(ip)
            print(f"Unblocked IP: {ip}")
            return True
        except subprocess.CalledProcessError:
            return False
    
    def cleanup_old_blocks(self):
        """Remove IP blocks older than 1 hour"""
        # This is a simplified version - in production, track block timestamps
        cutoff_time = datetime.now() - timedelta(hours=1)
        
        # Clean up threat counts older than 24 hours
        if datetime.now() - self.last_check > timedelta(hours=24):
            self.threat_counts.clear()
            self.last_check = datetime.now()
    
    def monitor_logs(self):
        """Main monitoring loop"""
        print("Starting threat response monitoring...")
        
        if not os.path.exists(self.log_file):
            print(f"Log file {self.log_file} not found")
            return
            
        # Follow log file
        subprocess.Popen(['touch', self.log_file])
        
        with open(self.log_file, 'r') as f:
            # Go to end of file
            f.seek(0, 2)
            
            while True:
                line = f.readline()
                if line:
                    parsed = self.parse_modsec_log(line)
                    if parsed and parsed['ml_score'] > 5:
                        ip = parsed['ip']
                        score = parsed['ml_score']
                        
                        print(f"Threat detected: IP {ip}, ML Score: {score}")
                        
                        if self.should_block_ip(ip, score):
                            self.block_ip(ip)
                else:
                    time.sleep(1)
                    self.cleanup_old_blocks()

if __name__ == '__main__':
    try:
        response_system = ThreatResponseSystem()
        response_system.monitor_logs()
    except KeyboardInterrupt:
        print("\nThreat response monitoring stopped.")
    except Exception as e:
        print(f"Error: {e}")

Set up threat response service

Create systemd service for automated threat response and enable it.

[Unit]
Description=ModSecurity Threat Response System
After=network.target apache2.service
Requires=apache2.service

[Service]
Type=simple
User=root
ExecStart=/usr/bin/python3 /etc/modsecurity/threat_response.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

Enable and start services

Enable all services and start ModSecurity with machine learning protection.

sudo chmod +x /etc/modsecurity/threat_response.py
sudo systemctl daemon-reload
sudo systemctl enable modsec-threat-response
sudo a2ensite 000-default-ssl
sudo a2enmod ssl
sudo systemctl restart apache2
sudo systemctl start modsec-threat-response
sudo chmod +x /etc/modsecurity/threat_response.py
sudo systemctl daemon-reload
sudo systemctl enable modsec-threat-response
sudo systemctl restart httpd
sudo systemctl start modsec-threat-response

Create ML model training script

Build a script to train the machine learning models on clean traffic data.

#!/usr/bin/env python3
import json
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import pickle
import os
from datetime import datetime

def train_models_from_clean_data(clean_log_file, model_path='/etc/modsecurity/models'):
    """Train ML models using clean traffic data"""
    print("Training ML models from clean data...")
    
    training_data = []
    
    # Read clean traffic logs
    try:
        with open(clean_log_file, 'r') as f:
            for line in f:
                try:
                    data = json.loads(line)
                    training_data.append(data)
                except json.JSONDecodeError:
                    continue
    except FileNotFoundError:
        print(f"Clean data file {clean_log_file} not found")
        return False
    
    if len(training_data) < 10:
        print("Not enough training data (minimum 10 samples)")
        return False
    
    # Extract features
    numerical_features = []
    text_data = []
    
    for data in training_data:
        # Extract numerical features
        features = [
            len(data.get('uri', '')),
            len(data.get('query_string', '')),
            len(data.get('request_body', '')),
            len(data.get('headers', {})),
            data.get('uri', '').count('/'),
            data.get('uri', '').count('.'),
            data.get('uri', '').count('?'),
            data.get('uri', '').count('&'),
            data.get('uri', '').count('%'),
            calculate_entropy(data.get('uri', ''))
        ]
        numerical_features.append(features)
        
        # Extract text data
        text = ' '.join([
            data.get('uri', ''),
            data.get('query_string', ''),
            data.get('request_body', '')[:1000]
        ])
        text_data.append(text)
    
    # Train models
    scaler = StandardScaler()
    vectorizer = TfidfVectorizer(max_features=1000, ngram_range=(1,2))
    isolation_forest = IsolationForest(contamination=0.1, random_state=42)
    
    # Fit transformers
    scaled_features = scaler.fit_transform(numerical_features)
    text_features = vectorizer.fit_transform(text_data)
    
    # Combine features and train
    combined_features = np.hstack([scaled_features, text_features.toarray()])
    isolation_forest.fit(combined_features)
    
    # Save models
    os.makedirs(model_path, exist_ok=True)
    
    with open(os.path.join(model_path, 'isolation_forest.pkl'), 'wb') as f:
        pickle.dump(isolation_forest, f)
    with open(os.path.join(model_path, 'scaler.pkl'), 'wb') as f:
        pickle.dump(scaler, f)
    with open(os.path.join(model_path, 'vectorizer.pkl'), 'wb') as f:
        pickle.dump(vectorizer, f)
    
    print(f"Models trained and saved to {model_path}")
    print(f"Training data: {len(training_data)} samples")
    return True

def calculate_entropy(text):
    """Calculate Shannon entropy"""
    if not text:
        return 0
    prob = [text.count(c) / len(text) for c in set(text)]
    return -sum(p * np.log2(p) for p in prob if p > 0)

def create_sample_clean_data():
    """Create sample clean data for initial training"""
    clean_data = [
        {"uri": "/", "query_string": "", "request_body": "", "headers": {"User-Agent": "Mozilla/5.0"}},
        {"uri": "/about", "query_string": "", "request_body": "", "headers": {"User-Agent": "Chrome/91.0"}},
        {"uri": "/contact", "query_string": "name=john", "request_body": "", "headers": {}},
        {"uri": "/products", "query_string": "category=electronics", "request_body": "", "headers": {}},
        {"uri": "/search", "query_string": "q=laptop", "request_body": "", "headers": {}},
        {"uri": "/api/users", "query_string": "", "request_body": '{"name":"test"}', "headers": {}},
        {"uri": "/login", "query_string": "", "request_body": "username=user&password=pass", "headers": {}},
        {"uri": "/css/style.css", "query_string": "", "request_body": "", "headers": {}},
        {"uri": "/js/app.js", "query_string": "", "request_body": "", "headers": {}},
        {"uri": "/images/logo.png", "query_string": "", "request_body": "", "headers": {}}
    ]
    
    with open('/var/lib/modsecurity/clean_data.json', 'w') as f:
        for data in clean_data:
            f.write(json.dumps(data) + '\n')
    
    print("Sample clean data created at /var/lib/modsecurity/clean_data.json")

if __name__ == '__main__':
    create_sample_clean_data()
    success = train_models_from_clean_data('/var/lib/modsecurity/clean_data.json')
    if success:
        print("ML models training completed successfully")
    else:
        print("ML models training failed")

Train initial ML models

Run the model training script to create initial machine learning models.

sudo chmod +x /etc/modsecurity/train_models.py
sudo python3 /etc/modsecurity/train_models.py
sudo chown -R www-data:www-data /etc/modsecurity/models

Configure ML tuning and monitoring

Create ML performance monitoring script

Build monitoring to track machine learning model performance and accuracy.

#!/usr/bin/env python3
import json
import time
import os
from datetime import datetime, timedelta
from collections import defaultdict
import subprocess

class MLPerformanceMonitor:
    def __init__(self):
        self.stats_file = '/var/log/modsecurity/ml_stats.json'
        self.stats = defaultdict(int)
        self.false_positives = []
        self.load_stats()
    
    def load_stats(self):
        """Load existing statistics"""
        try:
            with open(self.stats_file, 'r') as f:
                data = json.load(f)
                self.stats.update(data.get('stats', {}))
                self.false_positives = data.get('false_positives', [])
        except (FileNotFoundError, json.JSONDecodeError):
            pass
    
    def save_stats(self):
        """Save statistics to file"""
        data = {
            'stats': dict(self.stats),
            'false_positives': self.false_positives[-100:],  # Keep last 100
            'last_updated': datetime.now().isoformat()
        }
        
        with open(self.stats_file, 'w') as f:
            json.dump(data, f, indent=2)
    
    def analyze_log_entry(self, log_line):
        """Analyze ModSecurity log entries for ML performance"""
        if 'ML Score' in log_line:
            self.stats['ml_detections'] += 1
            
            # Extract ML score
            import re
            score_match = re.search(r'ML Score: ([\d.]+)', log_line)
            if score_match:
                score = float(score_match.group(1))
                
                if score > 8:
                    self.stats['high_risk_detections'] += 1
                elif score > 5:
                    self.stats['medium_risk_detections'] += 1
                else:
                    self.stats['low_risk_detections'] += 1
        
        if 'blocked' in log_line.lower():
            self.stats['blocked_requests'] += 1
    
    def check_model_performance(self):
        """Check if model needs retraining"""
        total_detections = self.stats.get('ml_detections', 0)
        high_risk = self.stats.get('high_risk_detections', 0)
        
        if total_detections > 0:
            high_risk_ratio = high_risk / total_detections
            
            # If too many high-risk detections, model may be too sensitive
            if high_risk_ratio > 0.3:
                return 'too_sensitive'
            # If too few high-risk detections, model may be too lenient
            elif high_risk_ratio < 0.05:
                return 'too_lenient'
        
        return 'normal'
    
    def generate_report(self):
        """Generate performance report"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'total_ml_detections': self.stats.get('ml_detections', 0),
            'high_risk_detections': self.stats.get('high_risk_detections', 0),
            'medium_risk_detections': self.stats.get('medium_risk_detections', 0),
            'low_risk_detections': self.stats.get('low_risk_detections', 0),
            'blocked_requests': self.stats.get('blocked_requests', 0),
            'model_status': self.check_model_performance(),
            'false_positives_count': len(self.false_positives)
        }
        
        return report
    
    def monitor_continuously(self):
        """Main monitoring loop"""
        audit_log = '/var/log/modsecurity/audit.log'
        
        if not os.path.exists(audit_log):
            print(f"Audit log {audit_log} not found")
            return
        
        print("Starting ML performance monitoring...")
        
        with open(audit_log, 'r') as f:
            f.seek(0, 2)  # Go to end
            
            while True:
                line = f.readline()
                if line:
                    self.analyze_log_entry(line)
                    
                    # Save stats every 100 detections
                    if self.stats['ml_detections'] % 100 == 0:
                        self.save_stats()
                        report = self.generate_report()
                        print(f"ML Stats: {json.dumps(report, indent=2)}")
                else:
                    time.sleep(5)
                    
                    # Generate hourly reports
                    if datetime.now().minute == 0:
                        self.save_stats()
                        report = self.generate_report()
                        
                        with open('/var/log/modsecurity/ml_report.json', 'w') as f:
                            json.dump(report, f, indent=2)

if __name__ == '__main__':
    try:
        monitor = MLPerformanceMonitor()
        monitor.monitor_continuously()
    except KeyboardInterrupt:
        print("\nML monitoring stopped.")

Set up model retraining cron job

Create automated model retraining based on new clean traffic data.

sudo chmod +x /etc/modsecurity/ml_monitor.py
sudo crontab -e

Add these entries to retrain models weekly:

# Retrain ML models weekly with accumulated clean data
0 2   0 /usr/bin/python3 /etc/modsecurity/train_models.py >> /var/log/modsecurity/training.log 2>&1

Generate daily ML performance reports

0 6 * /usr/bin/python3 /etc/modsecurity/ml_monitor.py --report >> /var/log/modsecurity/ml_performance.log 2>&1

Verify your setup

Test the machine learning anomaly detection system with various request patterns.

# Check ModSecurity is loaded
sudo apache2ctl -M | grep security

Verify ML models exist

sudo ls -la /etc/modsecurity/models/

Test normal request (should pass)

curl -k https://example.com/

Test suspicious request (should trigger ML detection)

curl -k "https://example.com/admin.php?cmd=ls%20-la&exec=cat%20/etc/passwd"

Check audit logs for ML scoring

sudo tail -f /var/log/modsecurity/audit.log | grep "ML Score"

Verify threat response is running

sudo systemctl status modsec-threat-response

Check blocked IPs

sudo iptables -L INPUT | grep DROP

Test ML detector directly

echo '{"uri":"/test","query_string":"","request_body":"","headers":{}}' | sudo -u www-data python3 /etc/modsecurity/ml_detector.py
Note: The ML models improve over time. Initial detection may show false positives until the system learns your normal traffic patterns.

Common issues

SymptomCauseFix
Python import errorsMissing ML dependenciessudo pip3 install numpy scipy scikit-learn pandas
ML models not foundTraining script didn't runsudo python3 /etc/modsecurity/train_models.py
Permission denied on modelsWrong file ownershipsudo chown -R www-data:www-data /etc/modsecurity/models
Too many false positivesModel too sensitiveIncrease threshold in /etc/modsecurity/ml_rules.conf from 5 to 7
Threat response not blockingService not runningsudo systemctl start modsec-threat-response
No ML scores in logsRules not loadedCheck ModSecurity3RulesFile path in virtual host config

Tune ML detection rules

Fine-tune the machine learning detection based on your traffic patterns and security requirements.

Edit the anomaly thresholds in /etc/modsecurity/ml_rules.conf:

# For stricter security (more blocking)
sudo sed -i 's/@gt 7/@gt 5/g' /etc/modsecurity/ml_rules.conf

For looser security (less false positives)

sudo sed -i 's/@gt 7/@gt 9/g' /etc/modsecurity/ml_rules.conf

Restart Apache to apply changes

sudo systemctl restart apache2

Monitor and adjust the learning rate by collecting more clean training data:

# Extract clean traffic from audit logs (manual review required)
sudo grep -v "anomaly" /var/log/modsecurity/audit.log | head -100 > /var/lib/modsecurity/additional_clean_data.json

Retrain with additional data

sudo python3 /etc/modsecurity/train_models.py

For better integration with your security infrastructure, consider linking to centralized ModSecurity log analysis or SOAR platform integration for automated incident response.

Next steps

Running this in production?

Want this handled for you? Running machine learning threat detection at scale adds a second layer of work: model tuning, performance monitoring, threat response orchestration, and 24/7 incident handling. Our managed platform covers monitoring, backups and response by default.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle infrastructure security hardening for businesses that depend on uptime. From initial setup to ongoing operations.