Integrate ClamAV cluster with file upload APIs for scalable malware scanning

Advanced 45 min Jun 03, 2026 103 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up a clustered ClamAV deployment with REST API endpoints for automated malware scanning of file uploads. Configure load balancing, authentication, and monitoring for high-availability antivirus scanning in production environments.

Prerequisites

  • At least 3 servers (2 ClamAV nodes + 1 load balancer)
  • 2GB RAM per ClamAV node
  • Basic knowledge of REST APIs
  • SSL certificates for production

What this solves

File uploads in web applications create security risks through malware, viruses, and malicious payloads. A clustered ClamAV deployment with REST API integration provides automatic scanning of uploads across multiple servers with load balancing and failover. This setup handles high-volume file scanning for enterprise applications while maintaining security and availability.

Prerequisites

You'll need at least three servers for this setup: two ClamAV nodes and one load balancer. Each server requires 2GB RAM minimum and 10GB storage for virus definitions. Install ClamAV cluster basics first if you haven't worked with ClamAV clustering before.

Step-by-step configuration

Install ClamAV and dependencies

Install ClamAV daemon and REST API tools on each scanning node. This includes the core antivirus engine and HTTP interface components.

sudo apt update
sudo apt install -y clamav clamav-daemon clamav-freshclam python3 python3-pip nginx
pip3 install flask gunicorn requests
sudo dnf update -y
sudo dnf install -y clamav clamav-daemon clamav-update python3 python3-pip nginx
pip3 install flask gunicorn requests

Configure ClamAV daemon

Configure ClamAV to accept TCP connections and optimize for API usage. This enables remote scanning requests from the REST API layer.

# Network configuration
TCPSocket 3310
TCPAddr 0.0.0.0
MaxConnectionQueueLength 50
MaxThreads 12

Performance tuning

MaxDirectoryRecursion 30 MaxFileSize 100M MaxScanSize 200M MaxFiles 10000

Logging

LogFile /var/log/clamav/clamav.log LogFileMaxSize 10M LogRotate yes LogVerbose yes LogTime yes

Security

User clamav LocalSocket /var/run/clamav/clamd.ctl LocalSocketGroup clamav LocalSocketMode 666 FixStaleSocket yes

Disable local-only restrictions

Comment out these lines if present

LocalSocket /var/run/clamav/clamd.ctl

TCPSocket disabled by default

Update virus definitions

Download the latest virus signatures and configure automatic updates. Fresh definitions are critical for detecting new threats.

sudo freshclam
sudo systemctl enable clamav-freshclam
sudo systemctl start clamav-freshclam

Start ClamAV daemon

Enable and start the ClamAV daemon service. Verify it's listening on the correct port for API connections.

sudo systemctl enable clamav-daemon
sudo systemctl start clamav-daemon
sudo systemctl status clamav-daemon
sudo netstat -tlnp | grep 3310

Create REST API service

Build a Flask application that provides HTTP endpoints for file scanning. This wraps ClamAV's socket interface in REST API calls.

#!/usr/bin/env python3
from flask import Flask, request, jsonify
import socket
import tempfile
import os
import hashlib
from datetime import datetime
import logging

app = Flask(__name__)

Configure logging

logging.basicConfig( filename='/var/log/clamav/api.log', level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s' )

Configuration

CLAMAV_HOST = 'localhost' CLAMAV_PORT = 3310 MAX_FILE_SIZE = 100 1024 1024 # 100MB API_KEY = 'your-secure-api-key-here' def scan_file(file_path): """Scan file using ClamAV daemon""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((CLAMAV_HOST, CLAMAV_PORT)) # Send SCAN command sock.send(f"SCAN {file_path}\n".encode()) result = sock.recv(1024).decode().strip() sock.close() return result except Exception as e: logging.error(f"Scan error: {str(e)}") return f"ERROR: {str(e)}" def verify_api_key(): """Verify API key from request headers""" auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return False token = auth_header.split(' ')[1] return token == API_KEY @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((CLAMAV_HOST, CLAMAV_PORT)) sock.send(b"PING\n") response = sock.recv(1024).decode().strip() sock.close() if "PONG" in response: return jsonify({"status": "healthy", "clamav": "connected"}), 200 else: return jsonify({"status": "unhealthy", "error": "ClamAV not responding"}), 503 except Exception as e: return jsonify({"status": "unhealthy", "error": str(e)}), 503 @app.route('/scan', methods=['POST']) def scan_upload(): """Scan uploaded file""" if not verify_api_key(): return jsonify({"error": "Invalid or missing API key"}), 401 if 'file' not in request.files: return jsonify({"error": "No file provided"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "Empty file name"}), 400 # Check file size file.seek(0, 2) # Seek to end size = file.tell() file.seek(0) # Reset to beginning if size > MAX_FILE_SIZE: return jsonify({"error": f"File too large. Max size: {MAX_FILE_SIZE} bytes"}), 413 # Create temporary file with tempfile.NamedTemporaryFile(delete=False) as temp_file: file.save(temp_file.name) temp_path = temp_file.name try: # Calculate file hash with open(temp_path, 'rb') as f: file_hash = hashlib.sha256(f.read()).hexdigest() # Scan file scan_result = scan_file(temp_path) # Parse result is_infected = "FOUND" in scan_result threat_name = None if is_infected: parts = scan_result.split() if len(parts) >= 2: threat_name = parts[1] response = { "filename": file.filename, "file_size": size, "file_hash": file_hash, "scan_time": datetime.utcnow().isoformat(), "is_infected": is_infected, "threat_name": threat_name, "scan_result": scan_result, "node_id": socket.gethostname() } # Log scan result logging.info(f"Scanned {file.filename}: {scan_result}") status_code = 200 if not is_infected else 409 return jsonify(response), status_code except Exception as e: logging.error(f"Scan failed for {file.filename}: {str(e)}") return jsonify({"error": "Scan failed", "details": str(e)}), 500 finally: # Clean up temporary file if os.path.exists(temp_path): os.unlink(temp_path) @app.route('/version', methods=['GET']) def get_version(): """Get ClamAV version info""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((CLAMAV_HOST, CLAMAV_PORT)) sock.send(b"VERSION\n") version = sock.recv(1024).decode().strip() sock.close() return jsonify({"version": version, "node_id": socket.gethostname()}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=8080, debug=False)

Create systemd service

Configure the API as a system service for automatic startup and process management. This ensures the API restarts after system reboots.

[Unit]
Description=ClamAV REST API
After=network.target clamav-daemon.service
Requires=clamav-daemon.service

[Service]
Type=exec
User=www-data
Group=www-data
WorkingDirectory=/opt/clamav-api
ExecStart=/usr/bin/gunicorn --bind 127.0.0.1:8080 --workers 4 --timeout 60 app:app
Restart=always
RestartSec=10

Security settings

NoNewPrivileges=yes ProtectSystem=strict ProtectHome=yes ReadWritePaths=/var/log/clamav /tmp

Resource limits

LimitNOFILE=1024 LimitNPROC=512 [Install] WantedBy=multi-user.target

Set up API directory and permissions

Create the application directory and set correct ownership. The www-data user needs access to run the Flask application.

sudo mkdir -p /opt/clamav-api
sudo mkdir -p /var/log/clamav
sudo chown -R www-data:www-data /opt/clamav-api
sudo chown -R clamav:clamav /var/log/clamav
sudo chmod 755 /opt/clamav-api
sudo chmod 644 /opt/clamav-api/app.py
sudo chmod 755 /var/log/clamav
Never use chmod 777. It gives every user on the system full access to your files. Instead, use specific ownership with chown and minimal permissions like 755 for directories and 644 for files.

Start the API service

Enable and start the REST API service. Verify it's running and accessible on the correct port.

sudo systemctl daemon-reload
sudo systemctl enable clamav-api
sudo systemctl start clamav-api
sudo systemctl status clamav-api
sudo netstat -tlnp | grep 8080

Configure HAProxy load balancer

Set up HAProxy on a separate server to distribute requests across ClamAV nodes. This provides high availability and horizontal scaling.

sudo apt install -y haproxy
sudo dnf install -y haproxy
global
    log stdout local0
    chroot /var/lib/haproxy
    stats socket /run/haproxy/admin.sock mode 660 level admin
    stats timeout 30s
    user haproxy
    group haproxy
    daemon

defaults
    mode http
    log global
    option httplog
    option dontlognull
    option log-health-checks
    timeout connect 5000
    timeout client 60000
    timeout server 60000
    timeout http-request 15s
    timeout http-keep-alive 15s

Statistics interface

stats enable stats uri /stats stats refresh 30s stats admin if TRUE

ClamAV API backend

backend clamav_api balance roundrobin option httpchk GET /health http-check expect status 200 # Add your ClamAV nodes here server clamav1 203.0.113.10:8080 check inter 10s fall 3 rise 2 server clamav2 203.0.113.11:8080 check inter 10s fall 3 rise 2 server clamav3 203.0.113.12:8080 check inter 10s fall 3 rise 2

Frontend for API requests

frontend clamav_frontend bind *:80 bind *:443 ssl crt /etc/ssl/certs/clamav-cluster.pem # Force HTTPS redirect scheme https if !{ ssl_fc } # Rate limiting stick-table type ip size 100k expire 30s store http_req_rate(10s) http-request track-sc0 src http-request reject if { sc_http_req_rate(0) gt 20 } # API key validation http-request reject unless { req.hdr(authorization) -m found } default_backend clamav_api

Generate SSL certificates

Create SSL certificates for secure API communication. Use Let's Encrypt or generate self-signed certificates for testing.

# For production, use Let's Encrypt
sudo apt install -y certbot
sudo certbot certonly --standalone -d clamav-api.example.com
sudo cat /etc/letsencrypt/live/clamav-api.example.com/fullchain.pem \
    /etc/letsencrypt/live/clamav-api.example.com/privkey.pem > \
    /etc/ssl/certs/clamav-cluster.pem

For testing, generate self-signed certificate

sudo openssl req -x509 -nodes -days 365 -newkey rsa:2048 \

-keyout /etc/ssl/private/clamav.key \

-out /etc/ssl/certs/clamav.crt \

-subj "/C=US/ST=State/L=City/O=Organization/CN=clamav-api.example.com"

sudo cat /etc/ssl/certs/clamav.crt /etc/ssl/private/clamav.key > /etc/ssl/certs/clamav-cluster.pem

Start HAProxy service

Enable and start the HAProxy load balancer. Verify the configuration and check that backends are healthy.

sudo haproxy -c -f /etc/haproxy/haproxy.cfg
sudo systemctl enable haproxy
sudo systemctl start haproxy
sudo systemctl status haproxy

Configure monitoring with Prometheus

Set up monitoring for cluster health and scanning metrics. This provides visibility into performance and availability. See our Prometheus alerting guide for detailed monitoring setup.

#!/usr/bin/env python3
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import socket
import requests

Metrics

scan_counter = Counter('clamav_scans_total', 'Total scans performed', ['status', 'node']) scan_duration = Histogram('clamav_scan_duration_seconds', 'Scan duration', ['node']) infected_counter = Counter('clamav_infected_files_total', 'Infected files found', ['threat', 'node']) clamav_up = Gauge('clamav_up', 'ClamAV daemon status', ['node']) def check_clamav_health(): """Check ClamAV daemon health""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect(('localhost', 3310)) sock.send(b"PING\n") response = sock.recv(1024).decode().strip() sock.close() return "PONG" in response except: return False def collect_metrics(): """Collect and expose metrics""" node_id = socket.gethostname() while True: # Check ClamAV health is_healthy = check_clamav_health() clamav_up.labels(node=node_id).set(1 if is_healthy else 0) time.sleep(30) if __name__ == '__main__': # Start metrics server start_http_server(9090) # Start collecting metrics collect_metrics()

Create web application integration

Build a simple client library for integrating with your web application. This shows how to call the scanning API from your application code.

#!/usr/bin/env python3
import requests
import json
import os
from typing import Optional, Dict, Any

class ClamAVClient:
    def __init__(self, api_url: str, api_key: str, timeout: int = 30):
        self.api_url = api_url.rstrip('/')
        self.api_key = api_key
        self.timeout = timeout
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {api_key}',
            'User-Agent': 'ClamAV-Client/1.0'
        })
    
    def health_check(self) -> Dict[str, Any]:
        """Check API health"""
        try:
            response = self.session.get(
                f"{self.api_url}/health",
                timeout=self.timeout
            )
            return {
                'healthy': response.status_code == 200,
                'data': response.json() if response.status_code == 200 else None,
                'status_code': response.status_code
            }
        except Exception as e:
            return {'healthy': False, 'error': str(e), 'status_code': None}
    
    def scan_file(self, file_path: str) -> Dict[str, Any]:
        """Scan a file"""
        if not os.path.exists(file_path):
            return {'error': 'File not found', 'is_infected': None}
        
        try:
            with open(file_path, 'rb') as f:
                files = {'file': (os.path.basename(file_path), f, 'application/octet-stream')}
                response = self.session.post(
                    f"{self.api_url}/scan",
                    files=files,
                    timeout=self.timeout
                )
            
            result = response.json()
            result['status_code'] = response.status_code
            return result
            
        except Exception as e:
            return {'error': str(e), 'is_infected': None, 'status_code': None}
    
    def scan_upload(self, file_data: bytes, filename: str) -> Dict[str, Any]:
        """Scan uploaded file data"""
        try:
            files = {'file': (filename, file_data, 'application/octet-stream')}
            response = self.session.post(
                f"{self.api_url}/scan",
                files=files,
                timeout=self.timeout
            )
            
            result = response.json()
            result['status_code'] = response.status_code
            return result
            
        except Exception as e:
            return {'error': str(e), 'is_infected': None, 'status_code': None}

Example usage

if __name__ == '__main__': # Configure client client = ClamAVClient( api_url='https://clamav-api.example.com', api_key='your-secure-api-key-here' ) # Health check health = client.health_check() print(f"Health: {health}") # Example file scan if len(os.sys.argv) > 1: result = client.scan_file(os.sys.argv[1]) print(f"Scan result: {json.dumps(result, indent=2)}")

Set up alerting configuration

Configure alerts for infected files and cluster health issues. This integrates with your existing monitoring stack for immediate threat notifications.

# Alert configuration for ClamAV cluster

Webhook URL for notifications

WEBHOOK_URL=https://hooks.slack.com/your-webhook-url ALERT_EMAIL=security@example.com

Thresholds

MAX_INFECTED_PER_HOUR=10 MAX_SCAN_TIME=60 MIN_HEALTHY_NODES=2

Log locations

API_LOG=/var/log/clamav/api.log CLAMAV_LOG=/var/log/clamav/clamav.log

Verify your setup

Test the complete scanning workflow to ensure all components work correctly.

# Check service status
sudo systemctl status clamav-daemon
sudo systemctl status clamav-api
sudo systemctl status haproxy

Test health endpoint

curl -H "Authorization: Bearer your-secure-api-key-here" \ https://clamav-api.example.com/health

Test file scanning

echo 'X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*' > /tmp/eicar.txt curl -H "Authorization: Bearer your-secure-api-key-here" \ -F "file=@/tmp/eicar.txt" \ https://clamav-api.example.com/scan

Check HAProxy stats

curl http://clamav-api.example.com/stats

Verify logs

sudo tail -f /var/log/clamav/api.log sudo tail -f /var/log/clamav/clamav.log

Integration examples

Django integration

from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.core.files.storage import default_storage
import requests

@csrf_exempt
def upload_file(request):
    if request.method == 'POST' and request.FILES.get('file'):
        uploaded_file = request.FILES['file']
        
        # Scan before saving
        scan_result = scan_upload_with_clamav(uploaded_file)
        
        if scan_result.get('is_infected'):
            return JsonResponse({
                'error': 'File contains malware',
                'threat': scan_result.get('threat_name'),
                'blocked': True
            }, status=400)
        
        # Save file if clean
        file_path = default_storage.save(uploaded_file.name, uploaded_file)
        
        return JsonResponse({
            'message': 'File uploaded successfully',
            'path': file_path,
            'scan_result': scan_result
        })

def scan_upload_with_clamav(file_obj):
    """Scan uploaded file with ClamAV API"""
    files = {'file': (file_obj.name, file_obj.read(), 'application/octet-stream')}
    headers = {'Authorization': 'Bearer your-secure-api-key-here'}
    
    try:
        response = requests.post(
            'https://clamav-api.example.com/scan',
            files=files,
            headers=headers,
            timeout=30
        )
        return response.json()
    except Exception as e:
        return {'error': str(e), 'is_infected': None}

Node.js Express integration

const multer = require('multer');
const axios = require('axios');
const FormData = require('form-data');

// Configure multer for memory storage
const upload = multer({ 
    storage: multer.memoryStorage(),
    limits: { fileSize: 100  1024  1024 } // 100MB limit
});

// ClamAV scanning middleware
async function scanFile(req, res, next) {
    if (!req.file) {
        return next();
    }
    
    try {
        const formData = new FormData();
        formData.append('file', req.file.buffer, {
            filename: req.file.originalname,
            contentType: req.file.mimetype
        });
        
        const response = await axios.post(
            'https://clamav-api.example.com/scan',
            formData,
            {
                headers: {
                    'Authorization': 'Bearer your-secure-api-key-here',
                    ...formData.getHeaders()
                },
                timeout: 30000
            }
        );
        
        const scanResult = response.data;
        
        if (scanResult.is_infected) {
            return res.status(400).json({
                error: 'File contains malware',
                threat: scanResult.threat_name,
                blocked: true
            });
        }
        
        // Add scan result to request for logging
        req.scanResult = scanResult;
        next();
        
    } catch (error) {
        console.error('Scan failed:', error.message);
        return res.status(500).json({
            error: 'File scanning failed',
            details: error.message
        });
    }
}

// Upload route with scanning
app.post('/upload', upload.single('file'), scanFile, (req, res) => {
    // File is clean, proceed with upload
    res.json({
        message: 'File uploaded successfully',
        filename: req.file.originalname,
        size: req.file.size,
        scanResult: req.scanResult
    });
});

Common issues

SymptomCauseFix
Connection refused to port 3310 ClamAV daemon not listening on network Check TCPSocket and TCPAddr in /etc/clamav/clamd.conf
401 Unauthorized from API Missing or invalid API key Verify Authorization: Bearer your-key header in requests
HAProxy shows backends as down Health check failing on nodes Check API service status and /health endpoint response
Slow scanning performance Insufficient resources or outdated signatures Increase MaxThreads in ClamAV config and run sudo freshclam
File upload rejected with 413 File exceeds size limit Adjust MAX_FILE_SIZE in API configuration and HAProxy timeout
SSL certificate errors Invalid or expired certificates Renew Let's Encrypt certificates or check certificate chain

Monitoring and alerting

Set up comprehensive monitoring for the ClamAV cluster to track performance and detect issues early. Monitor scan rates, infected file counts, and node health across your deployment.

# Add to prometheus.yml
  • job_name: 'clamav-cluster'
static_configs: - targets: ['203.0.113.10:9090', '203.0.113.11:9090', '203.0.113.12:9090'] labels: service: 'clamav'
  • job_name: 'haproxy'
static_configs: - targets: ['203.0.113.5:8404'] labels: service: 'haproxy'

Configure alerting rules for critical conditions like high infection rates, node failures, or performance degradation. See our Prometheus webhook integration guide for setting up notifications to Slack or email.

Security hardening

Implement additional security measures for production deployment. Run ClamAV processes in isolated containers or VMs to limit blast radius from potential exploits. Use network segmentation to isolate the scanning cluster from other services.

Security recommendation: Generate unique API keys per application and rotate them regularly. Log all scan requests and results for audit trails. Consider implementing IP whitelisting or VPN access for additional security.

Next steps

Running this in production?

Want this handled for you? Running this at scale adds a second layer of work: capacity planning, failover drills, cost control, and on-call. See how we run infrastructure like this for European teams.

Need help?

Don't want to manage this yourself?

We handle infrastructure security hardening for businesses that depend on uptime. From initial setup to ongoing operations.