Set up a DuckDB cluster with distributed query processing, network security, and performance optimization for high-throughput analytical workloads across multiple nodes.
Prerequisites
- Root or sudo access
- Multiple servers or VMs
- Basic Python knowledge
- Network connectivity between nodes
What this solves
DuckDB cluster deployments enable distributed analytics across multiple nodes, allowing you to process larger datasets than single-instance setups. This tutorial walks through configuring DuckDB instances for cluster operations, setting up distributed query processing, and implementing security measures for production workloads.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you get the latest versions and security patches.
sudo apt update && sudo apt upgrade -y
sudo apt install -y wget curl python3 python3-pip build-essential
Install DuckDB on all cluster nodes
Download and install DuckDB on each node that will participate in the cluster. We'll use the Python client for cluster coordination.
wget https://github.com/duckdb/duckdb/releases/latest/download/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
sudo mv duckdb /usr/local/bin/
sudo chmod +x /usr/local/bin/duckdb
pip3 install duckdb
Create cluster configuration directory
Set up the directory structure for cluster configuration files and shared storage.
sudo mkdir -p /opt/duckdb/cluster
sudo mkdir -p /opt/duckdb/data
sudo mkdir -p /opt/duckdb/logs
sudo useradd -r -s /bin/false -d /opt/duckdb duckdb
sudo chown -R duckdb:duckdb /opt/duckdb
Configure cluster coordinator script
Create the main coordinator script that manages cluster operations and distributed queries.
#!/usr/bin/env python3
import duckdb
import socket
import threading
import json
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
class DuckDBClusterCoordinator:
def __init__(self, node_id, cluster_nodes, data_dir='/opt/duckdb/data'):
self.node_id = node_id
self.cluster_nodes = cluster_nodes
self.data_dir = data_dir
self.connection = None
self.is_leader = False
def initialize_node(self):
"""Initialize DuckDB connection with cluster settings"""
try:
db_path = f"{self.data_dir}/node_{self.node_id}.db"
self.connection = duckdb.connect(db_path)
# Configure for cluster operations
self.connection.execute("PRAGMA threads=4")
self.connection.execute("PRAGMA memory_limit='2GB'")
# Enable parallel processing
self.connection.execute("SET enable_progress_bar=true")
self.connection.execute("SET preserve_insertion_order=false")
print(f"Node {self.node_id} initialized successfully")
return True
except Exception as e:
print(f"Failed to initialize node {self.node_id}: {e}")
return False
def execute_distributed_query(self, query, partition_key=None):
"""Execute query across cluster nodes"""
if not self.connection:
return None
try:
# For demonstration, execute locally
# In production, implement actual distribution logic
result = self.connection.execute(query).fetchall()
return result
except Exception as e:
print(f"Query execution failed on node {self.node_id}: {e}")
return None
class ClusterHTTPHandler(BaseHTTPRequestHandler):
def do_POST(self):
if self.path == '/execute':
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode('utf-8'))
query = data.get('query')
if query:
# Execute query using coordinator
result = coordinator.execute_distributed_query(query)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'status': 'success', 'data': result}
self.wfile.write(json.dumps(response).encode('utf-8'))
else:
self.send_error(400, "Missing query parameter")
except Exception as e:
self.send_error(500, str(e))
else:
self.send_error(404)
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Usage: python3 coordinator.py ")
sys.exit(1)
node_id = sys.argv[1]
cluster_nodes = ['node1', 'node2', 'node3'] # Configure your nodes
coordinator = DuckDBClusterCoordinator(node_id, cluster_nodes)
if coordinator.initialize_node():
# Start HTTP server for cluster communication
port = 8080 + int(node_id.replace('node', '')) if 'node' in node_id else 8080
server = HTTPServer(('0.0.0.0', port), ClusterHTTPHandler)
print(f"Node {node_id} listening on port {port}")
server.serve_forever()
Create systemd service for cluster nodes
Set up systemd services to manage DuckDB cluster nodes automatically.
[Unit]
Description=DuckDB Cluster Node %i
After=network.target
Wants=network-online.target
[Service]
Type=simple
User=duckdb
Group=duckdb
WorkingDirectory=/opt/duckdb
ExecStart=/usr/bin/python3 /opt/duckdb/cluster/coordinator.py %i
Restart=always
RestartSec=10
Environment=PYTHONPATH=/opt/duckdb
StandardOutput=journal
StandardError=journal
SyslogIdentifier=duckdb-cluster-%i
Security hardening
NoNewPrivileges=yes
PrivateTmp=yes
ProtectSystem=strict
ReadWritePaths=/opt/duckdb
ProtectHome=yes
[Install]
WantedBy=multi-user.target
Configure cluster networking
Set up firewall rules and network configuration for cluster communication.
sudo ufw allow 8080:8083/tcp comment 'DuckDB Cluster'
sudo ufw allow from 203.0.113.0/24 to any port 8080:8083 proto tcp
Create cluster configuration file
Define cluster topology and node roles in a central configuration file.
[cluster]
name = production_analytics
nodes = node1,node2,node3
replication_factor = 2
[node1]
host = 203.0.113.10
port = 8081
role = coordinator
data_dir = /opt/duckdb/data
max_memory = 4GB
max_threads = 8
[node2]
host = 203.0.113.11
port = 8082
role = worker
data_dir = /opt/duckdb/data
max_memory = 4GB
max_threads = 8
[node3]
host = 203.0.113.12
port = 8083
role = worker
data_dir = /opt/duckdb/data
max_memory = 4GB
max_threads = 8
[security]
ssl_enabled = true
ssl_cert = /opt/duckdb/ssl/cluster.crt
ssl_key = /opt/duckdb/ssl/cluster.key
auth_required = true
auth_method = token
Set up SSL certificates for cluster security
Generate SSL certificates for secure inter-node communication.
sudo mkdir -p /opt/duckdb/ssl
cd /opt/duckdb/ssl
Generate private key
sudo openssl genrsa -out cluster.key 2048
Generate certificate signing request
sudo openssl req -new -key cluster.key -out cluster.csr -subj "/C=US/ST=State/L=City/O=Organization/CN=duckdb-cluster"
Generate self-signed certificate
sudo openssl x509 -req -in cluster.csr -signkey cluster.key -out cluster.crt -days 365
Set proper permissions
sudo chown duckdb:duckdb /opt/duckdb/ssl/*
sudo chmod 600 /opt/duckdb/ssl/cluster.key
sudo chmod 644 /opt/duckdb/ssl/cluster.crt
Create cluster management script
Build a management script for common cluster operations like status checks and query distribution.
#!/usr/bin/env python3
import requests
import json
import argparse
import configparser
import sys
class DuckDBClusterManager:
def __init__(self, config_file='/opt/duckdb/cluster/cluster.conf'):
self.config = configparser.ConfigParser()
self.config.read(config_file)
self.nodes = {}
# Parse node configurations
for section in self.config.sections():
if section.startswith('node'):
node_info = dict(self.config[section])
self.nodes[section] = node_info
def check_cluster_status(self):
"""Check status of all cluster nodes"""
print("Checking cluster status...")
for node_name, node_info in self.nodes.items():
try:
url = f"http://{node_info['host']}:{node_info['port']}/health"
response = requests.get(url, timeout=5)
if response.status_code == 200:
print(f"✓ {node_name} ({node_info['host']}:{node_info['port']}) - HEALTHY")
else:
print(f"✗ {node_name} ({node_info['host']}:{node_info['port']}) - UNHEALTHY")
except requests.RequestException:
print(f"✗ {node_name} ({node_info['host']}:{node_info['port']}) - UNREACHABLE")
def execute_distributed_query(self, query):
"""Execute query across cluster"""
coordinator_node = None
for node_name, node_info in self.nodes.items():
if node_info.get('role') == 'coordinator':
coordinator_node = node_info
break
if not coordinator_node:
print("No coordinator node found")
return None
try:
url = f"http://{coordinator_node['host']}:{coordinator_node['port']}/execute"
payload = {'query': query}
response = requests.post(url, json=payload, timeout=30)
if response.status_code == 200:
result = response.json()
return result.get('data')
else:
print(f"Query failed: {response.text}")
return None
except requests.RequestException as e:
print(f"Failed to execute query: {e}")
return None
def main():
parser = argparse.ArgumentParser(description='DuckDB Cluster Manager')
parser.add_argument('command', choices=['status', 'query'], help='Command to execute')
parser.add_argument('--sql', help='SQL query to execute')
args = parser.parse_args()
manager = DuckDBClusterManager()
if args.command == 'status':
manager.check_cluster_status()
elif args.command == 'query':
if not args.sql:
print("--sql parameter required for query command")
sys.exit(1)
result = manager.execute_distributed_query(args.sql)
if result:
print(json.dumps(result, indent=2))
if __name__ == '__main__':
main()
Start cluster services
Enable and start the DuckDB cluster services on each node.
# Make scripts executable
sudo chmod +x /opt/duckdb/cluster/coordinator.py
sudo chmod +x /opt/duckdb/cluster/manage.py
Reload systemd and start services
sudo systemctl daemon-reload
Start each node (adjust node names as needed)
sudo systemctl enable duckdb-cluster@node1
sudo systemctl start duckdb-cluster@node1
Check service status
sudo systemctl status duckdb-cluster@node1
Configure performance monitoring
Set up monitoring and logging for cluster performance tracking.
#!/usr/bin/env python3
import psutil
import duckdb
import time
import json
from datetime import datetime
class DuckDBMonitor:
def __init__(self, log_file='/opt/duckdb/logs/performance.log'):
self.log_file = log_file
def collect_metrics(self):
"""Collect system and database metrics"""
metrics = {
'timestamp': datetime.now().isoformat(),
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/opt/duckdb/data').percent,
'network_io': dict(psutil.net_io_counters()._asdict())
}
# Log metrics
with open(self.log_file, 'a') as f:
f.write(json.dumps(metrics) + '\n')
return metrics
def check_query_performance(self, connection):
"""Monitor query performance"""
try:
# Simple performance test query
start_time = time.time()
result = connection.execute("SELECT COUNT(*) FROM information_schema.tables").fetchone()
end_time = time.time()
query_time = end_time - start_time
print(f"Query execution time: {query_time:.3f} seconds")
return query_time
except Exception as e:
print(f"Performance check failed: {e}")
return None
if __name__ == '__main__':
monitor = DuckDBMonitor()
while True:
metrics = monitor.collect_metrics()
print(f"CPU: {metrics['cpu_percent']}%, Memory: {metrics['memory_percent']}%")
time.sleep(60)
Verify your setup
Test cluster functionality and verify all components are working correctly.
# Check cluster status
python3 /opt/duckdb/cluster/manage.py status
Test basic connectivity
curl -X POST http://localhost:8081/health
Verify DuckDB installation
duckdb --version
Check service logs
sudo journalctl -u duckdb-cluster@node1 -f
Test distributed query execution:
# Execute test query through cluster manager
python3 /opt/duckdb/cluster/manage.py query --sql "SELECT 'Cluster is working!' as status"
Check performance monitoring
python3 /opt/duckdb/cluster/monitor.py
Configure advanced features
Set up data partitioning
Configure automatic data partitioning across cluster nodes for better performance.
-- Create partitioned tables for distributed analytics
CREATE TABLE sales_data (
id INTEGER,
transaction_date DATE,
amount DECIMAL(10,2),
region VARCHAR(50)
) PARTITION BY RANGE (transaction_date);
-- Create monthly partitions
CREATE TABLE sales_2024_01 PARTITION OF sales_data
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE sales_2024_02 PARTITION OF sales_data
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- Configure parallel processing
SET threads TO 4;
SET max_memory = '2GB';
SET enable_progress_bar = true;
Implement load balancing
Set up query load balancing across cluster nodes using HAProxy or similar.
global
daemon
user haproxy
group haproxy
defaults
mode http
timeout connect 5s
timeout client 60s
timeout server 60s
frontend duckdb_frontend
bind *:8080
default_backend duckdb_cluster
backend duckdb_cluster
balance roundrobin
server node1 203.0.113.10:8081 check
server node2 203.0.113.11:8082 check
server node3 203.0.113.12:8083 check
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Node connection refused | Service not running or firewall blocking | sudo systemctl start duckdb-cluster@node1 and check firewall rules |
| Out of memory errors | Insufficient memory allocation | Adjust max_memory in cluster configuration |
| Query timeout | Network latency or resource constraints | Increase timeout values and check network connectivity |
| SSL certificate errors | Certificate not properly configured | Regenerate certificates and verify permissions |
| Permission denied on data directory | Incorrect file ownership | sudo chown -R duckdb:duckdb /opt/duckdb |
Performance optimization
For high-performance workloads, consider integrating with existing data pipeline tools. The DuckDB with Apache Airflow tutorial shows how to automate analytical workflows.
Monitor cluster performance using centralized logging approaches covered in the Netdata centralized monitoring guide.
Next steps
- Automate DuckDB backups with S3 storage
- Monitor DuckDB performance with Prometheus and Grafana
- Implement SSL encryption and authentication for DuckDB
- Optimize DuckDB for time-series data workloads
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# DuckDB Cluster Installation Script
# Production-quality installer for distributed analytics setup
# Color codes for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Default values
NODE_ID="${1:-1}"
CLUSTER_NODES="${2:-localhost:8080}"
DATA_DIR="/opt/duckdb"
usage() {
echo "Usage: $0 [node_id] [cluster_nodes]"
echo " node_id: Unique identifier for this node (default: 1)"
echo " cluster_nodes: Comma-separated list of cluster nodes (default: localhost:8080)"
echo "Example: $0 1 \"node1:8080,node2:8080,node3:8080\""
exit 1
}
log() {
echo -e "${GREEN}[INFO]${NC} $1"
}
warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
error() {
echo -e "${RED}[ERROR]${NC} $1"
exit 1
}
cleanup() {
warn "Installation failed, performing cleanup..."
systemctl stop duckdb-cluster 2>/dev/null || true
systemctl disable duckdb-cluster 2>/dev/null || true
rm -f /etc/systemd/system/duckdb-cluster.service
userdel -r duckdb 2>/dev/null || true
rm -rf "$DATA_DIR" 2>/dev/null || true
}
trap cleanup ERR
# Check if running as root or with sudo
if [[ $EUID -ne 0 ]]; then
error "This script must be run as root or with sudo"
fi
# Auto-detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update && apt upgrade -y"
PKG_INSTALL="apt install -y"
PYTHON_PKG="python3 python3-pip python3-dev build-essential"
FIREWALL_CMD="ufw"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
PYTHON_PKG="python3 python3-pip gcc gcc-c++ make"
FIREWALL_CMD="firewall-cmd"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
PYTHON_PKG="python3 python3-pip gcc gcc-c++ make"
FIREWALL_CMD="firewall-cmd"
;;
*)
error "Unsupported distribution: $ID"
;;
esac
else
error "Cannot detect distribution"
fi
echo "[1/8] Updating system packages..."
$PKG_UPDATE
echo "[2/8] Installing dependencies..."
$PKG_INSTALL wget curl unzip $PYTHON_PKG
echo "[3/8] Installing DuckDB CLI..."
cd /tmp
wget -q https://github.com/duckdb/duckdb/releases/latest/download/duckdb_cli-linux-amd64.zip
unzip -q duckdb_cli-linux-amd64.zip
install -m 755 duckdb /usr/local/bin/
rm -f duckdb_cli-linux-amd64.zip duckdb
echo "[4/8] Installing DuckDB Python client..."
pip3 install --upgrade pip
pip3 install duckdb
echo "[5/8] Creating DuckDB system user and directories..."
useradd -r -s /bin/false -d "$DATA_DIR" -c "DuckDB Service User" duckdb || true
mkdir -p "$DATA_DIR"/{cluster,data,logs}
chown -R duckdb:duckdb "$DATA_DIR"
chmod 750 "$DATA_DIR"
chmod 750 "$DATA_DIR"/{cluster,data,logs}
echo "[6/8] Creating cluster coordinator script..."
cat > "$DATA_DIR/cluster/coordinator.py" << 'EOF'
#!/usr/bin/env python3
import duckdb
import json
import os
import sys
import threading
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
class DuckDBClusterCoordinator:
def __init__(self, node_id, cluster_nodes, data_dir='/opt/duckdb/data'):
self.node_id = node_id
self.cluster_nodes = cluster_nodes.split(',')
self.data_dir = data_dir
self.connection = None
def initialize_node(self):
try:
db_path = f"{self.data_dir}/node_{self.node_id}.db"
self.connection = duckdb.connect(db_path)
# Configure for cluster operations
self.connection.execute("PRAGMA threads=4")
self.connection.execute("PRAGMA memory_limit='2GB'")
self.connection.execute("SET enable_progress_bar=true")
self.connection.execute("SET preserve_insertion_order=false")
print(f"Node {self.node_id} initialized successfully")
return True
except Exception as e:
print(f"Failed to initialize node {self.node_id}: {e}")
return False
def execute_query(self, query):
if not self.connection:
return None
try:
result = self.connection.execute(query).fetchall()
return result
except Exception as e:
print(f"Query execution failed: {e}")
return None
class ClusterHTTPHandler(BaseHTTPRequestHandler):
def do_POST(self):
if self.path == '/execute':
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode('utf-8'))
query = data.get('query')
if query:
result = coordinator.execute_query(query)
response = {'status': 'success', 'data': result}
else:
response = {'status': 'error', 'message': 'No query provided'}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
self.send_error(500, str(e))
if __name__ == '__main__':
node_id = os.environ.get('NODE_ID', '1')
cluster_nodes = os.environ.get('CLUSTER_NODES', 'localhost:8080')
port = int(os.environ.get('PORT', '8080'))
coordinator = DuckDBClusterCoordinator(node_id, cluster_nodes)
if not coordinator.initialize_node():
sys.exit(1)
server = HTTPServer(('0.0.0.0', port), ClusterHTTPHandler)
print(f"DuckDB cluster node {node_id} starting on port {port}")
server.serve_forever()
EOF
chmod 750 "$DATA_DIR/cluster/coordinator.py"
chown duckdb:duckdb "$DATA_DIR/cluster/coordinator.py"
echo "[7/8] Creating systemd service..."
cat > /etc/systemd/system/duckdb-cluster.service << EOF
[Unit]
Description=DuckDB Cluster Node
After=network.target
[Service]
Type=simple
User=duckdb
Group=duckdb
WorkingDirectory=$DATA_DIR
Environment=NODE_ID=$NODE_ID
Environment=CLUSTER_NODES=$CLUSTER_NODES
Environment=PORT=8080
Environment=PYTHONPATH=/usr/local/lib/python3.*/site-packages
ExecStart=/usr/bin/python3 $DATA_DIR/cluster/coordinator.py
Restart=always
RestartSec=10
StandardOutput=append:$DATA_DIR/logs/cluster.log
StandardError=append:$DATA_DIR/logs/error.log
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable duckdb-cluster
echo "[8/8] Configuring firewall..."
if command -v ufw >/dev/null 2>&1; then
ufw allow 8080/tcp
elif command -v firewall-cmd >/dev/null 2>&1; then
firewall-cmd --permanent --add-port=8080/tcp
firewall-cmd --reload
fi
log "Starting DuckDB cluster service..."
systemctl start duckdb-cluster
sleep 3
echo "Verifying installation..."
if systemctl is-active --quiet duckdb-cluster; then
log "✓ DuckDB cluster service is running"
else
error "✗ DuckDB cluster service failed to start"
fi
if /usr/local/bin/duckdb --version >/dev/null 2>&1; then
log "✓ DuckDB CLI is working"
else
error "✗ DuckDB CLI installation failed"
fi
if curl -s http://localhost:8080/execute >/dev/null 2>&1; then
log "✓ Cluster HTTP interface is accessible"
else
warn "✗ Cluster HTTP interface not responding (may need time to start)"
fi
log "DuckDB cluster installation completed successfully!"
log "Node ID: $NODE_ID"
log "Cluster nodes: $CLUSTER_NODES"
log "Data directory: $DATA_DIR"
log "Service status: systemctl status duckdb-cluster"
log "Logs: journalctl -u duckdb-cluster -f"
Review the script before running. Execute with: bash install.sh