Configure advanced htop monitoring with custom scripts and alerting

Advanced 45 min May 09, 2026 69 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Build a production-grade monitoring system using custom-compiled htop with automated process analysis scripts and multi-channel alerting for proactive infrastructure management.

Prerequisites

  • Root access to the server
  • Python 3.8 or newer
  • Development tools for compilation
  • Basic understanding of systemd services
  • Email server configuration for alerts

What this solves

htop provides real-time process monitoring, but production environments need automated analysis and alerting. This tutorial creates an advanced monitoring system with custom htop compilation, automated scripts for process analysis, and alerting through email and webhooks. You'll build a comprehensive solution that detects resource bottlenecks, zombie processes, and performance anomalies before they impact your services.

Step-by-step configuration

Install build dependencies

Install the required development tools and libraries to compile htop with custom features.

sudo apt update
sudo apt install -y build-essential autotools-dev autoconf automake pkg-config
sudo apt install -y libncurses5-dev libncursesw5-dev libprocps-dev libsensors4-dev
sudo apt install -y libnl-3-dev libnl-genl-3-dev git curl jq
sudo dnf update -y
sudo dnf groupinstall -y "Development Tools"
sudo dnf install -y ncurses-devel procps-ng-devel lm_sensors-devel
sudo dnf install -y libnl3-devel autoconf automake pkg-config git curl jq

Download and compile custom htop

Clone the latest htop source and compile with enhanced monitoring features enabled.

cd /tmp
git clone https://github.com/htop-dev/htop.git
cd htop
./autogen.sh
./configure --enable-unicode --enable-sensors --enable-capabilities
make -j$(nproc)
sudo make install
sudo ldconfig

Create monitoring directories

Set up the directory structure for monitoring scripts, configurations, and logs.

sudo mkdir -p /opt/htop-monitor/{bin,config,logs,data}
sudo mkdir -p /var/log/htop-monitor
sudo chown -R $USER:$USER /opt/htop-monitor
sudo chmod 755 /opt/htop-monitor /var/log/htop-monitor

Configure advanced htop settings

Create a system-wide htop configuration with custom columns and monitoring-optimized display.

fields=0 48 17 18 38 39 40 2 46 47 49 1
sort_key=46
sort_direction=-1
hide_threads=0
hide_kernel_threads=0
hide_userland_threads=0
shadow_other_users=1
show_thread_names=1
show_program_path=1
highlight_base_name=1
highlight_megabytes=1
highlight_threads=1
highlight_changes=0
highlight_changes_delay_secs=5
find_comm_in_cmdline=1
strip_exe_from_cmdline=1
show_merged_command=0
tree_view=1
tree_view_always_by_pid=0
header_margin=1
detailed_cpu_time=1
cpu_count_from_one=1
show_cpu_usage=1
show_cpu_frequency=1
show_cpu_temperature=1
degree_fahrenheit=0
update_process_names=1
account_guest_in_cpu_meter=0
color_scheme=0
enable_mouse=1
delay=10
left_meters=LeftCPUs2 Memory Swap
left_meter_modes=1 1 1
right_meters=RightCPUs2 Tasks LoadAverage Uptime
right_meter_modes=1 2 2 2
hide_function_bar=0

Create process monitoring script

Build the main monitoring script that analyzes htop output and detects performance issues.

#!/usr/bin/env python3
import subprocess
import json
import time
import re
import os
import logging
from datetime import datetime
from collections import defaultdict

Configure logging

logging.basicConfig( filename='/var/log/htop-monitor/monitor.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) class ProcessMonitor: def __init__(self, config_file='/opt/htop-monitor/config/monitor.json'): self.config = self.load_config(config_file) self.alerts_sent = defaultdict(int) self.last_check = {} def load_config(self, config_file): try: with open(config_file, 'r') as f: return json.load(f) except FileNotFoundError: return self.get_default_config() def get_default_config(self): return { "thresholds": { "cpu_percent": 80.0, "memory_percent": 85.0, "zombie_count": 5, "load_average_1m": 4.0, "process_count": 500 }, "monitoring": { "interval": 30, "alert_cooldown": 300, "max_alerts_per_hour": 10 }, "alerts": { "email_enabled": True, "webhook_enabled": True, "webhook_url": "http://localhost:8080/webhook", "email_recipient": "admin@example.com" } } def get_system_stats(self): """Extract system statistics using htop batch mode""" try: # Use htop in batch mode with our custom config env = os.environ.copy() env['HTOPRC'] = '/opt/htop-monitor/config/htoprc' result = subprocess.run( ['htop', '-d', '1', '-n', '1', '--no-color'], capture_output=True, text=True, env=env, timeout=10 ) if result.returncode != 0: logging.error(f"htop failed: {result.stderr}") return None return self.parse_htop_output(result.stdout) except subprocess.TimeoutExpired: logging.error("htop command timed out") return None except Exception as e: logging.error(f"Error running htop: {e}") return None def parse_htop_output(self, output): """Parse htop output to extract metrics""" lines = output.strip().split('\n') stats = { 'timestamp': datetime.now().isoformat(), 'processes': [], 'system': {}, 'alerts': [] } # Parse system information from header for line in lines[:10]: # First 10 lines usually contain system info if 'Tasks:' in line: match = re.search(r'Tasks:\s+(\d+)\s+total', line) if match: stats['system']['total_tasks'] = int(match.group(1)) elif 'Load average:' in line: match = re.search(r'Load average:\s+([\d.]+)', line) if match: stats['system']['load_average'] = float(match.group(1)) elif 'CPU[s]:' in line or 'Cpu(s):' in line: # Extract CPU usage match = re.search(r'([\d.]+)%\s*us', line) if match: stats['system']['cpu_user'] = float(match.group(1)) elif 'Mem:' in line or 'Memory:' in line: # Extract memory usage match = re.search(r'([\d.]+)G\s*used', line) if match: stats['system']['memory_used_gb'] = float(match.group(1)) # Parse process information process_header_found = False for line in lines: if 'PID' in line and 'USER' in line and 'CPU%' in line: process_header_found = True continue if process_header_found and line.strip(): process_data = self.parse_process_line(line) if process_data: stats['processes'].append(process_data) return stats def parse_process_line(self, line): """Parse individual process line from htop output""" parts = line.split() if len(parts) < 8: return None try: return { 'pid': int(parts[0]), 'user': parts[1], 'cpu_percent': float(parts[2]) if parts[2] != '-' else 0.0, 'memory_percent': float(parts[3]) if parts[3] != '-' else 0.0, 'state': parts[7] if len(parts) > 7 else 'unknown', 'command': ' '.join(parts[8:]) if len(parts) > 8 else 'unknown' } except (ValueError, IndexError): return None def check_thresholds(self, stats): """Check if any metrics exceed configured thresholds""" alerts = [] thresholds = self.config['thresholds'] # Check system-wide metrics if 'load_average' in stats['system']: if stats['system']['load_average'] > thresholds['load_average_1m']: alerts.append({ 'type': 'high_load', 'severity': 'warning', 'value': stats['system']['load_average'], 'threshold': thresholds['load_average_1m'], 'message': f"High load average: {stats['system']['load_average']}" }) # Check for zombie processes zombie_count = sum(1 for p in stats['processes'] if p['state'] == 'Z') if zombie_count > thresholds['zombie_count']: alerts.append({ 'type': 'zombie_processes', 'severity': 'critical', 'value': zombie_count, 'threshold': thresholds['zombie_count'], 'message': f"High zombie process count: {zombie_count}" }) # Check for high CPU processes high_cpu_processes = [p for p in stats['processes'] if p['cpu_percent'] > thresholds['cpu_percent']] if high_cpu_processes: alerts.append({ 'type': 'high_cpu_processes', 'severity': 'warning', 'value': len(high_cpu_processes), 'processes': high_cpu_processes[:5], # Top 5 processes 'message': f"Found {len(high_cpu_processes)} high CPU processes" }) # Check for high memory processes high_mem_processes = [p for p in stats['processes'] if p['memory_percent'] > thresholds['memory_percent']] if high_mem_processes: alerts.append({ 'type': 'high_memory_processes', 'severity': 'warning', 'value': len(high_mem_processes), 'processes': high_mem_processes[:5], # Top 5 processes 'message': f"Found {len(high_mem_processes)} high memory processes" }) return alerts def send_alerts(self, alerts): """Send alerts via configured channels""" if not alerts: return for alert in alerts: alert_key = f"{alert['type']}_{alert.get('value', '')}" current_time = time.time() # Check cooldown period if (alert_key in self.last_check and current_time - self.last_check[alert_key] < self.config['monitoring']['alert_cooldown']): continue # Check rate limiting if self.alerts_sent[alert_key] >= self.config['monitoring']['max_alerts_per_hour']: continue # Send email alert if self.config['alerts']['email_enabled']: self.send_email_alert(alert) # Send webhook alert if self.config['alerts']['webhook_enabled']: self.send_webhook_alert(alert) self.last_check[alert_key] = current_time self.alerts_sent[alert_key] += 1 logging.info(f"Alert sent: {alert['message']}") def send_email_alert(self, alert): """Send email alert using system mail""" try: subject = f"htop Alert: {alert['type']}" body = f""" Alert Details: Type: {alert['type']} Severity: {alert['severity']} Message: {alert['message']} Timestamp: {datetime.now().isoformat()} Value: {alert.get('value', 'N/A')} Threshold: {alert.get('threshold', 'N/A')} """ if 'processes' in alert: body += "\nTop Processes:\n" for proc in alert['processes']: body += f" PID {proc['pid']}: {proc['command']} (CPU: {proc['cpu_percent']}%, MEM: {proc['memory_percent']}%)\n" cmd = ['mail', '-s', subject, self.config['alerts']['email_recipient']] subprocess.run(cmd, input=body, text=True, check=True) except Exception as e: logging.error(f"Failed to send email alert: {e}") def send_webhook_alert(self, alert): """Send webhook alert""" try: payload = { 'timestamp': datetime.now().isoformat(), 'hostname': subprocess.check_output(['hostname'], text=True).strip(), 'alert': alert } cmd = ['curl', '-X', 'POST', '-H', 'Content-Type: application/json', '-d', json.dumps(payload), self.config['alerts']['webhook_url']] subprocess.run(cmd, timeout=10, check=True) except Exception as e: logging.error(f"Failed to send webhook alert: {e}") def save_stats(self, stats): """Save statistics to data file for historical analysis""" data_file = f"/opt/htop-monitor/data/stats_{datetime.now().strftime('%Y%m%d')}.jsonl" try: with open(data_file, 'a') as f: f.write(json.dumps(stats) + '\n') except Exception as e: logging.error(f"Failed to save stats: {e}") def run_monitor_cycle(self): """Run one complete monitoring cycle""" stats = self.get_system_stats() if not stats: return alerts = self.check_thresholds(stats) stats['alerts'] = alerts self.send_alerts(alerts) self.save_stats(stats) logging.info(f"Monitor cycle completed. Found {len(alerts)} alerts.") if __name__ == '__main__': monitor = ProcessMonitor() monitor.run_monitor_cycle()

Create monitoring configuration

Set up the JSON configuration file with customizable thresholds and alert settings.

{
  "thresholds": {
    "cpu_percent": 80.0,
    "memory_percent": 85.0,
    "zombie_count": 5,
    "load_average_1m": 4.0,
    "process_count": 500,
    "disk_io_wait": 20.0
  },
  "monitoring": {
    "interval": 30,
    "alert_cooldown": 300,
    "max_alerts_per_hour": 10,
    "retention_days": 7,
    "batch_size": 100
  },
  "alerts": {
    "email_enabled": true,
    "webhook_enabled": true,
    "webhook_url": "http://localhost:8080/webhook",
    "email_recipient": "admin@example.com",
    "slack_webhook": "",
    "teams_webhook": ""
  },
  "filters": {
    "exclude_processes": ["kthreadd", "ksoftirqd", "migration"],
    "include_only_users": [],
    "minimum_cpu_threshold": 0.1,
    "minimum_memory_threshold": 0.1
  },
  "custom_columns": {
    "show_network_io": true,
    "show_disk_io": true,
    "show_file_descriptors": true,
    "show_cpu_affinity": false
  }
}

Create process analysis script

Build an advanced analysis script that provides trend analysis and performance insights.

#!/usr/bin/env python3
import json
import os
import glob
import argparse
import statistics
from datetime import datetime, timedelta
from collections import defaultdict, Counter

class ProcessAnalyzer:
    def __init__(self, data_dir='/opt/htop-monitor/data'):
        self.data_dir = data_dir
        
    def load_data_files(self, days_back=7):
        """Load process data from the last N days"""
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days_back)
        
        data = []
        
        for day_offset in range(days_back + 1):
            date = start_date + timedelta(days=day_offset)
            date_str = date.strftime('%Y%m%d')
            pattern = f"{self.data_dir}/stats_{date_str}.jsonl"
            
            for file_path in glob.glob(pattern):
                try:
                    with open(file_path, 'r') as f:
                        for line in f:
                            if line.strip():
                                data.append(json.loads(line))
                except Exception as e:
                    print(f"Error loading {file_path}: {e}")
        
        return sorted(data, key=lambda x: x['timestamp'])
    
    def analyze_cpu_trends(self, data):
        """Analyze CPU usage trends across processes"""
        cpu_by_process = defaultdict(list)
        cpu_by_hour = defaultdict(list)
        
        for entry in data:
            timestamp = datetime.fromisoformat(entry['timestamp'])
            hour_key = timestamp.strftime('%H')
            
            for process in entry['processes']:
                process_name = process['command'].split()[0] if process['command'] else 'unknown'
                cpu_by_process[process_name].append(process['cpu_percent'])
                cpu_by_hour[hour_key].append(process['cpu_percent'])
        
        # Calculate statistics
        trends = {
            'top_cpu_consumers': {},
            'hourly_patterns': {},
            'cpu_spikes': []
        }
        
        # Top CPU consuming processes
        for process, cpu_values in cpu_by_process.items():
            if len(cpu_values) > 10:  # Only processes with sufficient data
                trends['top_cpu_consumers'][process] = {
                    'avg_cpu': statistics.mean(cpu_values),
                    'max_cpu': max(cpu_values),
                    'median_cpu': statistics.median(cpu_values),
                    'samples': len(cpu_values)
                }
        
        # Hourly CPU patterns
        for hour, cpu_values in cpu_by_hour.items():
            if cpu_values:
                trends['hourly_patterns'][hour] = {
                    'avg_cpu': statistics.mean(cpu_values),
                    'max_cpu': max(cpu_values),
                    'process_count': len(cpu_values)
                }
        
        return trends
    
    def analyze_memory_trends(self, data):
        """Analyze memory usage trends"""
        memory_by_process = defaultdict(list)
        system_memory = []
        
        for entry in data:
            if 'memory_used_gb' in entry.get('system', {}):
                system_memory.append(entry['system']['memory_used_gb'])
            
            for process in entry['processes']:
                process_name = process['command'].split()[0] if process['command'] else 'unknown'
                memory_by_process[process_name].append(process['memory_percent'])
        
        trends = {
            'system_memory': {},
            'top_memory_consumers': {},
            'memory_leaks_suspected': []
        }
        
        # System memory trends
        if system_memory:
            trends['system_memory'] = {
                'avg_usage': statistics.mean(system_memory),
                'max_usage': max(system_memory),
                'trend': 'increasing' if len(system_memory) > 1 and system_memory[-1] > system_memory[0] else 'stable'
            }
        
        # Top memory consuming processes
        for process, memory_values in memory_by_process.items():
            if len(memory_values) > 20:
                avg_mem = statistics.mean(memory_values)
                max_mem = max(memory_values)
                
                trends['top_memory_consumers'][process] = {
                    'avg_memory': avg_mem,
                    'max_memory': max_mem,
                    'samples': len(memory_values)
                }
                
                # Detect potential memory leaks (increasing trend)
                if len(memory_values) > 50:
                    first_quarter = memory_values[:len(memory_values)//4]
                    last_quarter = memory_values[-len(memory_values)//4:]
                    
                    if statistics.mean(last_quarter) > statistics.mean(first_quarter) * 1.5:
                        trends['memory_leaks_suspected'].append({
                            'process': process,
                            'initial_avg': statistics.mean(first_quarter),
                            'recent_avg': statistics.mean(last_quarter),
                            'increase_ratio': statistics.mean(last_quarter) / statistics.mean(first_quarter)
                        })
        
        return trends
    
    def analyze_process_stability(self, data):
        """Analyze process stability and lifecycle patterns"""
        process_appearances = Counter()
        process_states = defaultdict(Counter)
        zombie_events = []
        
        for entry in data:
            timestamp = entry['timestamp']
            
            for process in entry['processes']:
                process_name = process['command'].split()[0] if process['command'] else 'unknown'
                process_appearances[process_name] += 1
                process_states[process_name][process['state']] += 1
                
                if process['state'] == 'Z':  # Zombie process
                    zombie_events.append({
                        'timestamp': timestamp,
                        'pid': process['pid'],
                        'process': process_name,
                        'user': process['user']
                    })
        
        stability = {
            'most_frequent_processes': dict(process_appearances.most_common(10)),
            'process_state_analysis': {},
            'zombie_analysis': {
                'total_zombie_events': len(zombie_events),
                'affected_processes': list(set(event['process'] for event in zombie_events)),
                'recent_zombies': zombie_events[-10:] if zombie_events else []
            },
            'unstable_processes': []
        }
        
        # Analyze process states for stability issues
        for process, states in process_states.items():
            total_appearances = sum(states.values())
            if total_appearances > 10:
                stability['process_state_analysis'][process] = dict(states)
                
                # Check for unstable processes (high proportion of problematic states)
                problematic_states = states.get('Z', 0) + states.get('T', 0) + states.get('D', 0)
                if problematic_states / total_appearances > 0.1:
                    stability['unstable_processes'].append({
                        'process': process,
                        'total_appearances': total_appearances,
                        'problematic_ratio': problematic_states / total_appearances,
                        'states': dict(states)
                    })
        
        return stability
    
    def generate_report(self, days_back=7):
        """Generate comprehensive analysis report"""
        print(f"Loading data for the last {days_back} days...")
        data = self.load_data_files(days_back)
        
        if not data:
            print("No data found for analysis.")
            return
        
        print(f"Analyzing {len(data)} data points...\n")
        
        # CPU Analysis
        print("=== CPU TRENDS ANALYSIS ===")
        cpu_trends = self.analyze_cpu_trends(data)
        
        print("Top CPU Consumers (by average usage):")
        sorted_cpu = sorted(cpu_trends['top_cpu_consumers'].items(), 
                           key=lambda x: x[1]['avg_cpu'], reverse=True)
        for process, stats in sorted_cpu[:10]:
            print(f"  {process}: {stats['avg_cpu']:.1f}% avg, {stats['max_cpu']:.1f}% max ({stats['samples']} samples)")
        
        print("\nCPU Usage by Hour:")
        for hour in sorted(cpu_trends['hourly_patterns'].keys()):
            stats = cpu_trends['hourly_patterns'][hour]
            print(f"  {hour}:00 - Avg: {stats['avg_cpu']:.1f}%, Max: {stats['max_cpu']:.1f}%")
        
        # Memory Analysis
        print("\n=== MEMORY TRENDS ANALYSIS ===")
        memory_trends = self.analyze_memory_trends(data)
        
        if 'avg_usage' in memory_trends['system_memory']:
            sys_mem = memory_trends['system_memory']
            print(f"System Memory: {sys_mem['avg_usage']:.1f}GB avg, {sys_mem['max_usage']:.1f}GB max, Trend: {sys_mem['trend']}")
        
        print("\nTop Memory Consumers:")
        sorted_mem = sorted(memory_trends['top_memory_consumers'].items(), 
                           key=lambda x: x[1]['avg_memory'], reverse=True)
        for process, stats in sorted_mem[:10]:
            print(f"  {process}: {stats['avg_memory']:.1f}% avg, {stats['max_memory']:.1f}% max")
        
        if memory_trends['memory_leaks_suspected']:
            print("\nSuspected Memory Leaks:")
            for leak in memory_trends['memory_leaks_suspected']:
                print(f"  {leak['process']}: {leak['initial_avg']:.1f}% → {leak['recent_avg']:.1f}% ({leak['increase_ratio']:.1f}x increase)")
        
        # Stability Analysis
        print("\n=== PROCESS STABILITY ANALYSIS ===")
        stability = self.analyze_process_stability(data)
        
        print(f"Total zombie events: {stability['zombie_analysis']['total_zombie_events']}")
        if stability['zombie_analysis']['affected_processes']:
            print(f"Processes with zombies: {', '.join(stability['zombie_analysis']['affected_processes'])}")
        
        if stability['unstable_processes']:
            print("\nUnstable Processes:")
            for proc in stability['unstable_processes']:
                print(f"  {proc['process']}: {proc['problematic_ratio']:.1%} problematic states")
        
        print("\nMost Active Processes:")
        for process, count in list(stability['most_frequent_processes'].items())[:10]:
            print(f"  {process}: {count} appearances")

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Analyze htop monitoring data')
    parser.add_argument('--days', type=int, default=7, help='Number of days to analyze')
    args = parser.parse_args()
    
    analyzer = ProcessAnalyzer()
    analyzer.generate_report(args.days)

Create webhook server for alerts

Set up a simple webhook server to receive and process monitoring alerts.

#!/usr/bin/env python3
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from http.server import HTTPServer, BaseHTTPRequestHandler
from datetime import datetime
import logging
import threading
import time

Configure logging

logging.basicConfig( filename='/var/log/htop-monitor/webhook.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) class WebhookHandler(BaseHTTPRequestHandler): def do_POST(self): """Handle incoming webhook alerts""" try: content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) if self.path == '/webhook': alert_data = json.loads(post_data.decode('utf-8')) self.process_alert(alert_data) self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps({'status': 'received'}).encode()) else: self.send_response(404) self.end_headers() except Exception as e: logging.error(f"Error processing webhook: {e}") self.send_response(500) self.end_headers() def process_alert(self, alert_data): """Process received alert and take appropriate actions""" alert = alert_data.get('alert', {}) hostname = alert_data.get('hostname', 'unknown') timestamp = alert_data.get('timestamp', datetime.now().isoformat()) logging.info(f"Received alert from {hostname}: {alert.get('message', 'No message')}") # Send to Slack if configured self.send_slack_notification(alert, hostname, timestamp) # Send to Microsoft Teams if configured self.send_teams_notification(alert, hostname, timestamp) # Log to file for historical tracking self.log_alert(alert, hostname, timestamp) def send_slack_notification(self, alert, hostname, timestamp): """Send alert to Slack webhook""" # Implementation for Slack webhook pass def send_teams_notification(self, alert, hostname, timestamp): """Send alert to Microsoft Teams webhook""" # Implementation for Teams webhook pass def log_alert(self, alert, hostname, timestamp): """Log alert to file""" log_entry = { 'timestamp': timestamp, 'hostname': hostname, 'alert_type': alert.get('type', 'unknown'), 'severity': alert.get('severity', 'unknown'), 'message': alert.get('message', ''), 'value': alert.get('value', ''), 'threshold': alert.get('threshold', '') } with open('/var/log/htop-monitor/alerts.log', 'a') as f: f.write(json.dumps(log_entry) + '\n') def log_message(self, format, *args): """Suppress default HTTP server logging""" pass class WebhookServer: def __init__(self, port=8080): self.port = port self.server = None def start(self): """Start the webhook server""" self.server = HTTPServer(('localhost', self.port), WebhookHandler) logging.info(f"Webhook server starting on port {self.port}") try: self.server.serve_forever() except KeyboardInterrupt: self.stop() def stop(self): """Stop the webhook server""" if self.server: logging.info("Shutting down webhook server") self.server.shutdown() self.server.server_close() if __name__ == '__main__': server = WebhookServer() server.start()

Create systemd service for continuous monitoring

Set up systemd services to run the monitoring script continuously.

[Unit]
Description=Advanced htop Process Monitor
After=network.target
Wants=network.target

[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/bin/python3 /opt/htop-monitor/bin/process_monitor.py
Restart=always
RestartSec=30
StandardOutput=journal
StandardError=journal
SyslogIdentifier=htop-monitor

Security settings

NoNewPrivileges=true PrivateTmp=true ProtectHome=true ProtectSystem=strict ReadWritePaths=/opt/htop-monitor /var/log/htop-monitor

Resource limits

CPUQuota=10% MemoryMax=256M [Install] WantedBy=multi-user.target

Create systemd timer for periodic monitoring

Set up a systemd timer to run monitoring checks at regular intervals.

[Unit]
Description=Run htop monitoring every 30 seconds
Requires=htop-monitor.service

[Timer]
OnBootSec=30
OnUnitActiveSec=30
AccuracySec=5s

[Install]
WantedBy=timers.target

Create webhook service

Set up the webhook server as a systemd service.

[Unit]
Description=htop Monitoring Webhook Server
After=network.target
Wants=network.target

[Service]
Type=simple
User=nobody
Group=nogroup
ExecStart=/usr/bin/python3 /opt/htop-monitor/bin/webhook_server.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=htop-webhook

Security settings

NoNewPrivileges=true PrivateTmp=true ProtectHome=true ProtectSystem=strict ReadWritePaths=/var/log/htop-monitor [Install] WantedBy=multi-user.target

Make scripts executable and set permissions

Set correct permissions for all monitoring scripts and ensure they can execute properly.

sudo chmod +x /opt/htop-monitor/bin/*.py
sudo chown -R root:root /opt/htop-monitor/bin
sudo chmod 644 /opt/htop-monitor/config/*
sudo chmod 755 /opt/htop-monitor/data
sudo chown -R root:root /etc/systemd/system/htop-*

Install mail utilities for email alerts

Install and configure mail utilities to enable email alerting functionality.

sudo apt install -y mailutils postfix
sudo dpkg-reconfigure postfix
sudo dnf install -y mailx postfix
sudo systemctl enable --now postfix

Enable and start monitoring services

Activate all monitoring components using systemd.

sudo systemctl daemon-reload
sudo systemctl enable htop-monitor.timer
sudo systemctl enable htop-webhook.service
sudo systemctl start htop-monitor.timer
sudo systemctl start htop-webhook.service

Create log rotation configuration

Configure logrotate to manage monitoring log files and prevent disk space issues.

/var/log/htop-monitor/*.log {
    daily
    rotate 30
    compress
    delaycompress
    missingok
    notifempty
    create 644 root root
    postrotate
        systemctl reload htop-webhook.service || true
    endscript
}

/opt/htop-monitor/data/*.jsonl {
    daily
    rotate 7
    compress
    delaycompress
    missingok
    notifempty
    create 644 root root
}

Verify your setup

Test the monitoring system and confirm all components are working correctly.

# Check service status
sudo systemctl status htop-monitor.timer
sudo systemctl status htop-webhook.service

Verify htop compilation

htop --version

Test monitoring script manually

sudo /usr/bin/python3 /opt/htop-monitor/bin/process_monitor.py

Check log files

sudo tail -f /var/log/htop-monitor/monitor.log sudo tail -f /var/log/htop-monitor/webhook.log

Test webhook server

curl -X POST http://localhost:8080/webhook \ -H "Content-Type: application/json" \ -d '{"test": "alert"}'

View recent monitoring data

ls -la /opt/htop-monitor/data/

Run analysis script

sudo /usr/bin/python3 /opt/htop-monitor/bin/analyze_processes.py --days 1

Common issues

Symptom Cause Fix
htop compilation fails Missing development packages Install all required -dev packages for your distribution
Monitor script fails with permission error Incorrect file permissions sudo chmod +x /opt/htop-monitor/bin/*.py
Email alerts not working Mail system not configured Configure postfix: sudo dpkg-reconfigure postfix
Webhook server returns 500 error JSON parsing error in alerts Check webhook logs: sudo journalctl -u htop-webhook -f
No monitoring data files created Directory permissions issue sudo chown -R root:root /opt/htop-monitor/data
Timer not triggering monitoring Timer service not started sudo systemctl start htop-monitor.timer
High CPU usage from monitoring Too frequent monitoring interval Increase interval in /opt/htop-monitor/config/monitor.json
Analysis script shows no data No historical data available Wait for monitoring to run and collect data over time

Next steps

Running this in production?

Want this handled for you? Setting up advanced monitoring once is straightforward. Keeping it tuned, integrated with your alerting infrastructure, and responding to the alerts 24/7 is the harder part. See how we run infrastructure like this for European SaaS and e-commerce teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.