Set up a clustered ClamAV deployment with REST API endpoints for automated malware scanning of file uploads. Configure load balancing, authentication, and monitoring for high-availability antivirus scanning in production environments.
Prerequisites
- At least 3 servers (2 ClamAV nodes + 1 load balancer)
- 2GB RAM per ClamAV node
- Basic knowledge of REST APIs
- SSL certificates for production
What this solves
File uploads in web applications create security risks through malware, viruses, and malicious payloads. A clustered ClamAV deployment with REST API integration provides automatic scanning of uploads across multiple servers with load balancing and failover. This setup handles high-volume file scanning for enterprise applications while maintaining security and availability.
Prerequisites
You'll need at least three servers for this setup: two ClamAV nodes and one load balancer. Each server requires 2GB RAM minimum and 10GB storage for virus definitions. Install ClamAV cluster basics first if you haven't worked with ClamAV clustering before.
Step-by-step configuration
Install ClamAV and dependencies
Install ClamAV daemon and REST API tools on each scanning node. This includes the core antivirus engine and HTTP interface components.
sudo apt update
sudo apt install -y clamav clamav-daemon clamav-freshclam python3 python3-pip nginx
pip3 install flask gunicorn requests
Configure ClamAV daemon
Configure ClamAV to accept TCP connections and optimize for API usage. This enables remote scanning requests from the REST API layer.
# Network configuration
TCPSocket 3310
TCPAddr 0.0.0.0
MaxConnectionQueueLength 50
MaxThreads 12
Performance tuning
MaxDirectoryRecursion 30
MaxFileSize 100M
MaxScanSize 200M
MaxFiles 10000
Logging
LogFile /var/log/clamav/clamav.log
LogFileMaxSize 10M
LogRotate yes
LogVerbose yes
LogTime yes
Security
User clamav
LocalSocket /var/run/clamav/clamd.ctl
LocalSocketGroup clamav
LocalSocketMode 666
FixStaleSocket yes
Disable local-only restrictions
Comment out these lines if present
LocalSocket /var/run/clamav/clamd.ctl
TCPSocket disabled by default
Update virus definitions
Download the latest virus signatures and configure automatic updates. Fresh definitions are critical for detecting new threats.
sudo freshclam
sudo systemctl enable clamav-freshclam
sudo systemctl start clamav-freshclam
Start ClamAV daemon
Enable and start the ClamAV daemon service. Verify it's listening on the correct port for API connections.
sudo systemctl enable clamav-daemon
sudo systemctl start clamav-daemon
sudo systemctl status clamav-daemon
sudo netstat -tlnp | grep 3310
Create REST API service
Build a Flask application that provides HTTP endpoints for file scanning. This wraps ClamAV's socket interface in REST API calls.
#!/usr/bin/env python3
from flask import Flask, request, jsonify
import socket
import tempfile
import os
import hashlib
from datetime import datetime
import logging
app = Flask(__name__)
Configure logging
logging.basicConfig(
filename='/var/log/clamav/api.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s: %(message)s'
)
Configuration
CLAMAV_HOST = 'localhost'
CLAMAV_PORT = 3310
MAX_FILE_SIZE = 100 1024 1024 # 100MB
API_KEY = 'your-secure-api-key-here'
def scan_file(file_path):
"""Scan file using ClamAV daemon"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((CLAMAV_HOST, CLAMAV_PORT))
# Send SCAN command
sock.send(f"SCAN {file_path}\n".encode())
result = sock.recv(1024).decode().strip()
sock.close()
return result
except Exception as e:
logging.error(f"Scan error: {str(e)}")
return f"ERROR: {str(e)}"
def verify_api_key():
"""Verify API key from request headers"""
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return False
token = auth_header.split(' ')[1]
return token == API_KEY
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((CLAMAV_HOST, CLAMAV_PORT))
sock.send(b"PING\n")
response = sock.recv(1024).decode().strip()
sock.close()
if "PONG" in response:
return jsonify({"status": "healthy", "clamav": "connected"}), 200
else:
return jsonify({"status": "unhealthy", "error": "ClamAV not responding"}), 503
except Exception as e:
return jsonify({"status": "unhealthy", "error": str(e)}), 503
@app.route('/scan', methods=['POST'])
def scan_upload():
"""Scan uploaded file"""
if not verify_api_key():
return jsonify({"error": "Invalid or missing API key"}), 401
if 'file' not in request.files:
return jsonify({"error": "No file provided"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "Empty file name"}), 400
# Check file size
file.seek(0, 2) # Seek to end
size = file.tell()
file.seek(0) # Reset to beginning
if size > MAX_FILE_SIZE:
return jsonify({"error": f"File too large. Max size: {MAX_FILE_SIZE} bytes"}), 413
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
file.save(temp_file.name)
temp_path = temp_file.name
try:
# Calculate file hash
with open(temp_path, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
# Scan file
scan_result = scan_file(temp_path)
# Parse result
is_infected = "FOUND" in scan_result
threat_name = None
if is_infected:
parts = scan_result.split()
if len(parts) >= 2:
threat_name = parts[1]
response = {
"filename": file.filename,
"file_size": size,
"file_hash": file_hash,
"scan_time": datetime.utcnow().isoformat(),
"is_infected": is_infected,
"threat_name": threat_name,
"scan_result": scan_result,
"node_id": socket.gethostname()
}
# Log scan result
logging.info(f"Scanned {file.filename}: {scan_result}")
status_code = 200 if not is_infected else 409
return jsonify(response), status_code
except Exception as e:
logging.error(f"Scan failed for {file.filename}: {str(e)}")
return jsonify({"error": "Scan failed", "details": str(e)}), 500
finally:
# Clean up temporary file
if os.path.exists(temp_path):
os.unlink(temp_path)
@app.route('/version', methods=['GET'])
def get_version():
"""Get ClamAV version info"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((CLAMAV_HOST, CLAMAV_PORT))
sock.send(b"VERSION\n")
version = sock.recv(1024).decode().strip()
sock.close()
return jsonify({"version": version, "node_id": socket.gethostname()}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=False)
Create systemd service
Configure the API as a system service for automatic startup and process management. This ensures the API restarts after system reboots.
[Unit]
Description=ClamAV REST API
After=network.target clamav-daemon.service
Requires=clamav-daemon.service
[Service]
Type=exec
User=www-data
Group=www-data
WorkingDirectory=/opt/clamav-api
ExecStart=/usr/bin/gunicorn --bind 127.0.0.1:8080 --workers 4 --timeout 60 app:app
Restart=always
RestartSec=10
Security settings
NoNewPrivileges=yes
ProtectSystem=strict
ProtectHome=yes
ReadWritePaths=/var/log/clamav /tmp
Resource limits
LimitNOFILE=1024
LimitNPROC=512
[Install]
WantedBy=multi-user.target
Set up API directory and permissions
Create the application directory and set correct ownership. The www-data user needs access to run the Flask application.
sudo mkdir -p /opt/clamav-api
sudo mkdir -p /var/log/clamav
sudo chown -R www-data:www-data /opt/clamav-api
sudo chown -R clamav:clamav /var/log/clamav
sudo chmod 755 /opt/clamav-api
sudo chmod 644 /opt/clamav-api/app.py
sudo chmod 755 /var/log/clamav
Start the API service
Enable and start the REST API service. Verify it's running and accessible on the correct port.
sudo systemctl daemon-reload
sudo systemctl enable clamav-api
sudo systemctl start clamav-api
sudo systemctl status clamav-api
sudo netstat -tlnp | grep 8080
Configure HAProxy load balancer
Set up HAProxy on a separate server to distribute requests across ClamAV nodes. This provides high availability and horizontal scaling.
sudo apt install -y haproxy
global
log stdout local0
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin
stats timeout 30s
user haproxy
group haproxy
daemon
defaults
mode http
log global
option httplog
option dontlognull
option log-health-checks
timeout connect 5000
timeout client 60000
timeout server 60000
timeout http-request 15s
timeout http-keep-alive 15s
Statistics interface
stats enable
stats uri /stats
stats refresh 30s
stats admin if TRUE
ClamAV API backend
backend clamav_api
balance roundrobin
option httpchk GET /health
http-check expect status 200
# Add your ClamAV nodes here
server clamav1 203.0.113.10:8080 check inter 10s fall 3 rise 2
server clamav2 203.0.113.11:8080 check inter 10s fall 3 rise 2
server clamav3 203.0.113.12:8080 check inter 10s fall 3 rise 2
Frontend for API requests
frontend clamav_frontend
bind *:80
bind *:443 ssl crt /etc/ssl/certs/clamav-cluster.pem
# Force HTTPS
redirect scheme https if !{ ssl_fc }
# Rate limiting
stick-table type ip size 100k expire 30s store http_req_rate(10s)
http-request track-sc0 src
http-request reject if { sc_http_req_rate(0) gt 20 }
# API key validation
http-request reject unless { req.hdr(authorization) -m found }
default_backend clamav_api
Generate SSL certificates
Create SSL certificates for secure API communication. Use Let's Encrypt or generate self-signed certificates for testing.
# For production, use Let's Encrypt
sudo apt install -y certbot
sudo certbot certonly --standalone -d clamav-api.example.com
sudo cat /etc/letsencrypt/live/clamav-api.example.com/fullchain.pem \
/etc/letsencrypt/live/clamav-api.example.com/privkey.pem > \
/etc/ssl/certs/clamav-cluster.pem
For testing, generate self-signed certificate
sudo openssl req -x509 -nodes -days 365 -newkey rsa:2048 \
-keyout /etc/ssl/private/clamav.key \
-out /etc/ssl/certs/clamav.crt \
-subj "/C=US/ST=State/L=City/O=Organization/CN=clamav-api.example.com"
sudo cat /etc/ssl/certs/clamav.crt /etc/ssl/private/clamav.key > /etc/ssl/certs/clamav-cluster.pem
Start HAProxy service
Enable and start the HAProxy load balancer. Verify the configuration and check that backends are healthy.
sudo haproxy -c -f /etc/haproxy/haproxy.cfg
sudo systemctl enable haproxy
sudo systemctl start haproxy
sudo systemctl status haproxy
Configure monitoring with Prometheus
Set up monitoring for cluster health and scanning metrics. This provides visibility into performance and availability. See our Prometheus alerting guide for detailed monitoring setup.
#!/usr/bin/env python3
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import socket
import requests
Metrics
scan_counter = Counter('clamav_scans_total', 'Total scans performed', ['status', 'node'])
scan_duration = Histogram('clamav_scan_duration_seconds', 'Scan duration', ['node'])
infected_counter = Counter('clamav_infected_files_total', 'Infected files found', ['threat', 'node'])
clamav_up = Gauge('clamav_up', 'ClamAV daemon status', ['node'])
def check_clamav_health():
"""Check ClamAV daemon health"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect(('localhost', 3310))
sock.send(b"PING\n")
response = sock.recv(1024).decode().strip()
sock.close()
return "PONG" in response
except:
return False
def collect_metrics():
"""Collect and expose metrics"""
node_id = socket.gethostname()
while True:
# Check ClamAV health
is_healthy = check_clamav_health()
clamav_up.labels(node=node_id).set(1 if is_healthy else 0)
time.sleep(30)
if __name__ == '__main__':
# Start metrics server
start_http_server(9090)
# Start collecting metrics
collect_metrics()
Create web application integration
Build a simple client library for integrating with your web application. This shows how to call the scanning API from your application code.
#!/usr/bin/env python3
import requests
import json
import os
from typing import Optional, Dict, Any
class ClamAVClient:
def __init__(self, api_url: str, api_key: str, timeout: int = 30):
self.api_url = api_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {api_key}',
'User-Agent': 'ClamAV-Client/1.0'
})
def health_check(self) -> Dict[str, Any]:
"""Check API health"""
try:
response = self.session.get(
f"{self.api_url}/health",
timeout=self.timeout
)
return {
'healthy': response.status_code == 200,
'data': response.json() if response.status_code == 200 else None,
'status_code': response.status_code
}
except Exception as e:
return {'healthy': False, 'error': str(e), 'status_code': None}
def scan_file(self, file_path: str) -> Dict[str, Any]:
"""Scan a file"""
if not os.path.exists(file_path):
return {'error': 'File not found', 'is_infected': None}
try:
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f, 'application/octet-stream')}
response = self.session.post(
f"{self.api_url}/scan",
files=files,
timeout=self.timeout
)
result = response.json()
result['status_code'] = response.status_code
return result
except Exception as e:
return {'error': str(e), 'is_infected': None, 'status_code': None}
def scan_upload(self, file_data: bytes, filename: str) -> Dict[str, Any]:
"""Scan uploaded file data"""
try:
files = {'file': (filename, file_data, 'application/octet-stream')}
response = self.session.post(
f"{self.api_url}/scan",
files=files,
timeout=self.timeout
)
result = response.json()
result['status_code'] = response.status_code
return result
except Exception as e:
return {'error': str(e), 'is_infected': None, 'status_code': None}
Example usage
if __name__ == '__main__':
# Configure client
client = ClamAVClient(
api_url='https://clamav-api.example.com',
api_key='your-secure-api-key-here'
)
# Health check
health = client.health_check()
print(f"Health: {health}")
# Example file scan
if len(os.sys.argv) > 1:
result = client.scan_file(os.sys.argv[1])
print(f"Scan result: {json.dumps(result, indent=2)}")
Set up alerting configuration
Configure alerts for infected files and cluster health issues. This integrates with your existing monitoring stack for immediate threat notifications.
# Alert configuration for ClamAV cluster
Webhook URL for notifications
WEBHOOK_URL=https://hooks.slack.com/your-webhook-url
ALERT_EMAIL=security@example.com
Thresholds
MAX_INFECTED_PER_HOUR=10
MAX_SCAN_TIME=60
MIN_HEALTHY_NODES=2
Log locations
API_LOG=/var/log/clamav/api.log
CLAMAV_LOG=/var/log/clamav/clamav.log
Verify your setup
Test the complete scanning workflow to ensure all components work correctly.
# Check service status
sudo systemctl status clamav-daemon
sudo systemctl status clamav-api
sudo systemctl status haproxy
Test health endpoint
curl -H "Authorization: Bearer your-secure-api-key-here" \
https://clamav-api.example.com/health
Test file scanning
echo 'X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*' > /tmp/eicar.txt
curl -H "Authorization: Bearer your-secure-api-key-here" \
-F "file=@/tmp/eicar.txt" \
https://clamav-api.example.com/scan
Check HAProxy stats
curl http://clamav-api.example.com/stats
Verify logs
sudo tail -f /var/log/clamav/api.log
sudo tail -f /var/log/clamav/clamav.log
Integration examples
Django integration
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.core.files.storage import default_storage
import requests
@csrf_exempt
def upload_file(request):
if request.method == 'POST' and request.FILES.get('file'):
uploaded_file = request.FILES['file']
# Scan before saving
scan_result = scan_upload_with_clamav(uploaded_file)
if scan_result.get('is_infected'):
return JsonResponse({
'error': 'File contains malware',
'threat': scan_result.get('threat_name'),
'blocked': True
}, status=400)
# Save file if clean
file_path = default_storage.save(uploaded_file.name, uploaded_file)
return JsonResponse({
'message': 'File uploaded successfully',
'path': file_path,
'scan_result': scan_result
})
def scan_upload_with_clamav(file_obj):
"""Scan uploaded file with ClamAV API"""
files = {'file': (file_obj.name, file_obj.read(), 'application/octet-stream')}
headers = {'Authorization': 'Bearer your-secure-api-key-here'}
try:
response = requests.post(
'https://clamav-api.example.com/scan',
files=files,
headers=headers,
timeout=30
)
return response.json()
except Exception as e:
return {'error': str(e), 'is_infected': None}
Node.js Express integration
const multer = require('multer');
const axios = require('axios');
const FormData = require('form-data');
// Configure multer for memory storage
const upload = multer({
storage: multer.memoryStorage(),
limits: { fileSize: 100 1024 1024 } // 100MB limit
});
// ClamAV scanning middleware
async function scanFile(req, res, next) {
if (!req.file) {
return next();
}
try {
const formData = new FormData();
formData.append('file', req.file.buffer, {
filename: req.file.originalname,
contentType: req.file.mimetype
});
const response = await axios.post(
'https://clamav-api.example.com/scan',
formData,
{
headers: {
'Authorization': 'Bearer your-secure-api-key-here',
...formData.getHeaders()
},
timeout: 30000
}
);
const scanResult = response.data;
if (scanResult.is_infected) {
return res.status(400).json({
error: 'File contains malware',
threat: scanResult.threat_name,
blocked: true
});
}
// Add scan result to request for logging
req.scanResult = scanResult;
next();
} catch (error) {
console.error('Scan failed:', error.message);
return res.status(500).json({
error: 'File scanning failed',
details: error.message
});
}
}
// Upload route with scanning
app.post('/upload', upload.single('file'), scanFile, (req, res) => {
// File is clean, proceed with upload
res.json({
message: 'File uploaded successfully',
filename: req.file.originalname,
size: req.file.size,
scanResult: req.scanResult
});
});
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Connection refused to port 3310 | ClamAV daemon not listening on network | Check TCPSocket and TCPAddr in /etc/clamav/clamd.conf |
| 401 Unauthorized from API | Missing or invalid API key | Verify Authorization: Bearer your-key header in requests |
| HAProxy shows backends as down | Health check failing on nodes | Check API service status and /health endpoint response |
| Slow scanning performance | Insufficient resources or outdated signatures | Increase MaxThreads in ClamAV config and run sudo freshclam |
| File upload rejected with 413 | File exceeds size limit | Adjust MAX_FILE_SIZE in API configuration and HAProxy timeout |
| SSL certificate errors | Invalid or expired certificates | Renew Let's Encrypt certificates or check certificate chain |
Monitoring and alerting
Set up comprehensive monitoring for the ClamAV cluster to track performance and detect issues early. Monitor scan rates, infected file counts, and node health across your deployment.
# Add to prometheus.yml
- job_name: 'clamav-cluster'
static_configs:
- targets: ['203.0.113.10:9090', '203.0.113.11:9090', '203.0.113.12:9090']
labels:
service: 'clamav'
- job_name: 'haproxy'
static_configs:
- targets: ['203.0.113.5:8404']
labels:
service: 'haproxy'
Configure alerting rules for critical conditions like high infection rates, node failures, or performance degradation. See our Prometheus webhook integration guide for setting up notifications to Slack or email.
Security hardening
Implement additional security measures for production deployment. Run ClamAV processes in isolated containers or VMs to limit blast radius from potential exploits. Use network segmentation to isolate the scanning cluster from other services.
Next steps
- Set up network policy monitoring for container security
- Configure high-availability caching for API responses
- Implement centralized security monitoring with ELK stack integration
- Secure API keys with Vault for credential management