Automate Zabbix 7 monitoring tasks with Python scripts using the API. Set up host discovery, template assignment, custom monitoring scripts, and alerting automation for streamlined infrastructure management.
Prerequisites
- Zabbix 7 server installed and accessible
- Python 3.8+ environment
- Zabbix API user with Super Admin permissions
- Network connectivity to monitored hosts
What this solves
Zabbix API automation eliminates repetitive manual tasks in monitoring infrastructure. You can programmatically add hosts, assign templates, create custom checks, and configure alerts without clicking through the web interface. This approach scales monitoring operations and integrates Zabbix with your existing DevOps workflows.
Step-by-step configuration
Install Python dependencies
Install the required Python packages for Zabbix API interaction and HTTP requests.
sudo apt update
sudo apt install -y python3 python3-pip python3-venv
python3 -m venv zabbix-automation
source zabbix-automation/bin/activate
pip install pyzabbix requests
Create API connection script
Set up the base connection module that handles authentication and API requests to your Zabbix server.
#!/usr/bin/env python3
import json
from pyzabbix import ZabbixAPI
import logging
Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ZabbixConnection:
def __init__(self, url, username, password):
self.url = url
self.username = username
self.password = password
self.api = None
def connect(self):
"""Establish connection to Zabbix API"""
try:
self.api = ZabbixAPI(self.url)
self.api.login(self.username, self.password)
logger.info(f"Connected to Zabbix API: {self.url}")
return True
except Exception as e:
logger.error(f"Failed to connect to Zabbix API: {e}")
return False
def disconnect(self):
"""Close API connection"""
if self.api:
self.api.user.logout()
logger.info("Disconnected from Zabbix API")
def get_api(self):
"""Return the API object"""
return self.api
Create configuration file
Store your Zabbix server connection details in a separate configuration file for security and flexibility.
{
"zabbix": {
"url": "http://your-zabbix-server/zabbix",
"username": "api-user",
"password": "your-secure-password"
},
"monitoring": {
"default_group": "Linux servers",
"default_templates": [
"Linux by Zabbix agent",
"Linux filesystems by Zabbix agent"
]
},
"alerts": {
"email_media_type": "Email",
"severity_threshold": 3
}
}
Set secure file permissions
Protect the configuration file containing credentials from unauthorized access.
chmod 600 /home/user/zabbix-scripts/config.json
chmod +x /home/user/zabbix-scripts/zabbix_api.py
Create host discovery automation script
Implement automated host discovery and registration with template assignment and group membership.
#!/usr/bin/env python3
import json
import sys
import argparse
from zabbix_api import ZabbixConnection
import logging
logger = logging.getLogger(__name__)
class HostManager:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.zabbix = ZabbixConnection(
self.config['zabbix']['url'],
self.config['zabbix']['username'],
self.config['zabbix']['password']
)
def add_host(self, hostname, ip_address, groups=None, templates=None, port=10050):
"""Add a new host to Zabbix monitoring"""
if not self.zabbix.connect():
return False
api = self.zabbix.get_api()
try:
# Get group IDs
if not groups:
groups = [self.config['monitoring']['default_group']]
group_ids = []
for group_name in groups:
group = api.hostgroup.get(filter={'name': group_name})
if not group:
# Create group if it doesn't exist
group_result = api.hostgroup.create(name=group_name)
group_ids.append({'groupid': group_result['groupids'][0]})
logger.info(f"Created new host group: {group_name}")
else:
group_ids.append({'groupid': group[0]['groupid']})
# Get template IDs
if not templates:
templates = self.config['monitoring']['default_templates']
template_ids = []
for template_name in templates:
template = api.template.get(filter={'host': template_name})
if template:
template_ids.append({'templateid': template[0]['templateid']})
else:
logger.warning(f"Template not found: {template_name}")
# Check if host already exists
existing_host = api.host.get(filter={'host': hostname})
if existing_host:
logger.warning(f"Host {hostname} already exists")
return False
# Create host
host_data = {
'host': hostname,
'name': hostname,
'groups': group_ids,
'templates': template_ids,
'interfaces': [{
'type': 1, # Agent interface
'main': 1,
'useip': 1,
'ip': ip_address,
'dns': '',
'port': str(port)
}]
}
result = api.host.create(**host_data)
logger.info(f"Successfully added host: {hostname} ({ip_address})")
return result['hostids'][0]
except Exception as e:
logger.error(f"Failed to add host {hostname}: {e}")
return False
finally:
self.zabbix.disconnect()
def bulk_add_hosts(self, hosts_file):
"""Add multiple hosts from a JSON file"""
with open(hosts_file, 'r') as f:
hosts = json.load(f)
results = []
for host in hosts:
result = self.add_host(
host['hostname'],
host['ip_address'],
host.get('groups'),
host.get('templates'),
host.get('port', 10050)
)
results.append({'hostname': host['hostname'], 'success': bool(result)})
return results
def main():
parser = argparse.ArgumentParser(description='Zabbix Host Discovery Automation')
parser.add_argument('--config', default='config.json', help='Configuration file path')
parser.add_argument('--hostname', help='Single hostname to add')
parser.add_argument('--ip', help='IP address for single host')
parser.add_argument('--bulk-file', help='JSON file with multiple hosts')
args = parser.parse_args()
host_manager = HostManager(args.config)
if args.bulk_file:
results = host_manager.bulk_add_hosts(args.bulk_file)
for result in results:
print(f"Host {result['hostname']}: {'Success' if result['success'] else 'Failed'}")
elif args.hostname and args.ip:
result = host_manager.add_host(args.hostname, args.ip)
print(f"Host addition: {'Success' if result else 'Failed'}")
else:
parser.print_help()
if __name__ == '__main__':
main()
Create custom monitoring script
Implement custom monitoring checks that extend beyond standard Zabbix templates for application-specific metrics.
#!/usr/bin/env python3
import json
import sys
from zabbix_api import ZabbixConnection
import logging
logger = logging.getLogger(__name__)
class CustomMonitoring:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.zabbix = ZabbixConnection(
self.config['zabbix']['url'],
self.config['zabbix']['username'],
self.config['zabbix']['password']
)
def create_custom_item(self, hostname, item_config):
"""Create a custom monitoring item for a host"""
if not self.zabbix.connect():
return False
api = self.zabbix.get_api()
try:
# Get host ID
host = api.host.get(filter={'host': hostname})
if not host:
logger.error(f"Host not found: {hostname}")
return False
host_id = host[0]['hostid']
# Create item
item_data = {
'hostid': host_id,
'name': item_config['name'],
'key_': item_config['key'],
'type': item_config.get('type', 0), # Zabbix agent
'value_type': item_config.get('value_type', 3), # Numeric unsigned
'delay': item_config.get('delay', '60s'),
'history': item_config.get('history', '90d'),
'trends': item_config.get('trends', '365d'),
'description': item_config.get('description', '')
}
result = api.item.create(**item_data)
logger.info(f"Created custom item: {item_config['name']} for host {hostname}")
return result['itemids'][0]
except Exception as e:
logger.error(f"Failed to create item: {e}")
return False
finally:
self.zabbix.disconnect()
def create_custom_trigger(self, hostname, trigger_config):
"""Create a custom trigger for monitoring alerts"""
if not self.zabbix.connect():
return False
api = self.zabbix.get_api()
try:
# Create trigger
trigger_data = {
'description': trigger_config['description'],
'expression': trigger_config['expression'],
'priority': trigger_config.get('priority', 3), # Average severity
'status': trigger_config.get('status', 0), # Enabled
'type': trigger_config.get('type', 0), # Single problem
'recovery_mode': trigger_config.get('recovery_mode', 0), # Expression
'recovery_expression': trigger_config.get('recovery_expression', ''),
'comments': trigger_config.get('comments', '')
}
result = api.trigger.create(**trigger_data)
logger.info(f"Created custom trigger: {trigger_config['description']}")
return result['triggerids'][0]
except Exception as e:
logger.error(f"Failed to create trigger: {e}")
return False
finally:
self.zabbix.disconnect()
def setup_application_monitoring(self, hostname, app_name, metrics):
"""Set up comprehensive application monitoring"""
results = []
for metric in metrics:
# Create monitoring item
item_config = {
'name': f"{app_name} - {metric['name']}",
'key': f"custom.{app_name}.{metric['key']}",
'type': metric.get('type', 0),
'value_type': metric.get('value_type', 3),
'delay': metric.get('delay', '60s'),
'description': metric.get('description', '')
}
item_result = self.create_custom_item(hostname, item_config)
# Create associated trigger if specified
if 'trigger' in metric:
trigger_config = {
'description': f"{app_name} - {metric['trigger']['description']}",
'expression': metric['trigger']['expression'].format(
hostname=hostname,
key=item_config['key']
),
'priority': metric['trigger'].get('priority', 3)
}
trigger_result = self.create_custom_trigger(hostname, trigger_config)
results.append({
'metric': metric['name'],
'item': bool(item_result),
'trigger': bool(trigger_result)
})
else:
results.append({
'metric': metric['name'],
'item': bool(item_result),
'trigger': None
})
return results
def main():
# Example usage
custom_mon = CustomMonitoring('config.json')
# Example application monitoring setup
nginx_metrics = [
{
'name': 'Active Connections',
'key': 'nginx.connections.active',
'description': 'Number of active NGINX connections',
'trigger': {
'description': 'High number of active connections',
'expression': 'last(/{hostname}/custom.nginx.connections.active)>1000',
'priority': 3
}
},
{
'name': 'Requests per Second',
'key': 'nginx.requests.rate',
'description': 'NGINX requests per second',
'trigger': {
'description': 'High request rate detected',
'expression': 'last(/{hostname}/custom.nginx.requests.rate)>500',
'priority': 2
}
}
]
results = custom_mon.setup_application_monitoring('web-server-01', 'nginx', nginx_metrics)
for result in results:
print(f"Metric: {result['metric']} - Item: {result['item']}, Trigger: {result['trigger']}")
if __name__ == '__main__':
main()
Create alerting automation script
Implement automated alert configuration with user notifications and escalation policies.
#!/usr/bin/env python3
import json
from zabbix_api import ZabbixConnection
import logging
logger = logging.getLogger(__name__)
class AlertManager:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.zabbix = ZabbixConnection(
self.config['zabbix']['url'],
self.config['zabbix']['username'],
self.config['zabbix']['password']
)
def create_user_group(self, group_name, users=None):
"""Create a user group for alert management"""
if not self.zabbix.connect():
return False
api = self.zabbix.get_api()
try:
# Check if group exists
existing_group = api.usergroup.get(filter={'name': group_name})
if existing_group:
logger.info(f"User group {group_name} already exists")
return existing_group[0]['usrgrpid']
# Create user group
group_data = {
'name': group_name,
'gui_access': 2, # Internal authentication
'users_status': 0, # Enabled
'debug_mode': 0 # Disabled
}
if users:
user_ids = []
for username in users:
user = api.user.get(filter={'alias': username})
if user:
user_ids.append({'userid': user[0]['userid']})
group_data['userids'] = user_ids
result = api.usergroup.create(**group_data)
logger.info(f"Created user group: {group_name}")
return result['usrgrpids'][0]
except Exception as e:
logger.error(f"Failed to create user group: {e}")
return False
finally:
self.zabbix.disconnect()
def create_action(self, action_name, conditions, operations):
"""Create an automated action for alerts"""
if not self.zabbix.connect():
return False
api = self.zabbix.get_api()
try:
# Check if action exists
existing_action = api.action.get(filter={'name': action_name})
if existing_action:
logger.info(f"Action {action_name} already exists")
return existing_action[0]['actionid']
# Create action
action_data = {
'name': action_name,
'eventsource': 0, # Trigger events
'status': 0, # Enabled
'esc_period': '1h',
'def_shortdata': 'Problem: {EVENT.NAME}',
'def_longdata': 'Problem started at {EVENT.TIME} on {EVENT.DATE}\nProblem name: {EVENT.NAME}\nHost: {HOST.NAME}\nSeverity: {EVENT.SEVERITY}\n\nOriginal problem ID: {EVENT.ID}\n{TRIGGER.URL}',
'recovery_msg': 1, # Send recovery message
'r_shortdata': 'Resolved: {EVENT.NAME}',
'r_longdata': 'Problem has been resolved at {EVENT.RECOVERY.TIME} on {EVENT.RECOVERY.DATE}\nProblem name: {EVENT.NAME}\nHost: {HOST.NAME}\nSeverity: {EVENT.SEVERITY}\n\nOriginal problem ID: {EVENT.ID}\n{TRIGGER.URL}',
'filter': {
'evaltype': 0, # AND/OR
'conditions': conditions
},
'operations': operations
}
result = api.action.create(**action_data)
logger.info(f"Created action: {action_name}")
return result['actionids'][0]
except Exception as e:
logger.error(f"Failed to create action: {e}")
return False
finally:
self.zabbix.disconnect()
def setup_email_notifications(self, severity_levels=None):
"""Set up email notifications for different severity levels"""
if not severity_levels:
severity_levels = ['High', 'Disaster']
# Define conditions for severity
severity_map = {
'Not classified': 0,
'Information': 1,
'Warning': 2,
'Average': 3,
'High': 4,
'Disaster': 5
}
conditions = []
for severity in severity_levels:
if severity in severity_map:
conditions.append({
'conditiontype': 4, # Trigger severity
'operator': 7, # >=
'value': str(severity_map[severity])
})
# Define email operations
operations = [{
'operationtype': 0, # Send message
'esc_period': 0,
'esc_step_from': 1,
'esc_step_to': 1,
'evaltype': 0,
'opmessage': {
'default_msg': 1,
'mediatypeid': self._get_media_type_id('Email')
},
'opmessage_grp': [{
'usrgrpid': self._get_admin_group_id()
}]
}]
action_name = f"Email notifications for {', '.join(severity_levels)} severity"
return self.create_action(action_name, conditions, operations)
def _get_media_type_id(self, media_type_name):
"""Get media type ID by name"""
if not self.zabbix.api:
self.zabbix.connect()
try:
media_type = self.zabbix.api.mediatype.get(filter={'name': media_type_name})
return media_type[0]['mediatypeid'] if media_type else '1'
except:
return '1' # Default to first media type
def _get_admin_group_id(self):
"""Get admin group ID"""
if not self.zabbix.api:
self.zabbix.connect()
try:
admin_group = self.zabbix.api.usergroup.get(filter={'name': 'Zabbix administrators'})
return admin_group[0]['usrgrpid'] if admin_group else '7'
except:
return '7' # Default admin group ID
def main():
alert_manager = AlertManager('config.json')
# Set up email notifications for high severity alerts
result = alert_manager.setup_email_notifications(['High', 'Disaster'])
print(f"Email notifications setup: {'Success' if result else 'Failed'}")
# Create a custom user group for operations team
group_result = alert_manager.create_user_group('Operations Team')
print(f"Operations team group creation: {'Success' if group_result else 'Failed'}")
if __name__ == '__main__':
main()
Create example hosts configuration
Create a sample hosts file for bulk host addition to test the automation scripts.
[
{
"hostname": "web-server-01",
"ip_address": "203.0.113.10",
"groups": ["Web servers", "Production"],
"templates": ["Linux by Zabbix agent", "Apache by Zabbix agent"],
"port": 10050
},
{
"hostname": "db-server-01",
"ip_address": "203.0.113.11",
"groups": ["Database servers", "Production"],
"templates": ["Linux by Zabbix agent", "MySQL by Zabbix agent"],
"port": 10050
},
{
"hostname": "app-server-01",
"ip_address": "203.0.113.12",
"groups": ["Application servers", "Production"],
"templates": ["Linux by Zabbix agent"],
"port": 10050
}
]
Create automation runner script
Create a main script that orchestrates all automation tasks and provides a simple interface for common operations.
#!/usr/bin/env python3
import argparse
import json
import sys
from host_discovery import HostManager
from custom_monitoring import CustomMonitoring
from alert_automation import AlertManager
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def run_host_discovery(config_file, hosts_file=None, hostname=None, ip_address=None):
"""Run host discovery automation"""
host_manager = HostManager(config_file)
if hosts_file:
logger.info(f"Running bulk host discovery from {hosts_file}")
results = host_manager.bulk_add_hosts(hosts_file)
for result in results:
status = "Success" if result['success'] else "Failed"
logger.info(f"Host {result['hostname']}: {status}")
return results
elif hostname and ip_address:
logger.info(f"Adding single host: {hostname} ({ip_address})")
result = host_manager.add_host(hostname, ip_address)
return bool(result)
else:
logger.error("Either provide hosts file or hostname/ip for single host")
return False
def setup_monitoring(config_file, hostname, app_name, metrics_file=None):
"""Set up custom monitoring for an application"""
custom_mon = CustomMonitoring(config_file)
if metrics_file:
with open(metrics_file, 'r') as f:
metrics = json.load(f)
else:
# Default NGINX monitoring metrics
metrics = [
{
'name': 'Active Connections',
'key': 'nginx.connections.active',
'description': 'Number of active NGINX connections',
'trigger': {
'description': 'High number of active connections',
'expression': 'last(/{hostname}/custom.nginx.connections.active)>1000',
'priority': 3
}
}
]
logger.info(f"Setting up monitoring for {app_name} on {hostname}")
results = custom_mon.setup_application_monitoring(hostname, app_name, metrics)
for result in results:
item_status = "Success" if result['item'] else "Failed"
trigger_status = "Success" if result['trigger'] else "Failed" if result['trigger'] is not None else "None"
logger.info(f"Metric {result['metric']}: Item={item_status}, Trigger={trigger_status}")
return results
def setup_alerts(config_file, severity_levels=None):
"""Set up automated alerting"""
alert_manager = AlertManager(config_file)
if not severity_levels:
severity_levels = ['High', 'Disaster']
logger.info(f"Setting up alerts for severity levels: {severity_levels}")
result = alert_manager.setup_email_notifications(severity_levels)
if result:
logger.info("Alert configuration completed successfully")
else:
logger.error("Alert configuration failed")
return bool(result)
def main():
parser = argparse.ArgumentParser(description='Zabbix Automation Suite')
parser.add_argument('--config', default='config.json', help='Configuration file path')
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Host discovery command
host_parser = subparsers.add_parser('hosts', help='Host discovery automation')
host_parser.add_argument('--bulk-file', help='JSON file with multiple hosts')
host_parser.add_argument('--hostname', help='Single hostname to add')
host_parser.add_argument('--ip', help='IP address for single host')
# Monitoring setup command
monitor_parser = subparsers.add_parser('monitor', help='Custom monitoring setup')
monitor_parser.add_argument('--hostname', required=True, help='Target hostname')
monitor_parser.add_argument('--app', required=True, help='Application name')
monitor_parser.add_argument('--metrics-file', help='JSON file with metrics definition')
# Alerting setup command
alert_parser = subparsers.add_parser('alerts', help='Automated alerting setup')
alert_parser.add_argument('--severity', nargs='+', default=['High', 'Disaster'],
help='Severity levels for alerts')
# Complete setup command
complete_parser = subparsers.add_parser('complete', help='Complete monitoring setup')
complete_parser.add_argument('--hosts-file', required=True, help='JSON file with hosts')
complete_parser.add_argument('--setup-alerts', action='store_true', help='Set up alerting')
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
if args.command == 'hosts':
result = run_host_discovery(args.config, args.bulk_file, args.hostname, args.ip)
sys.exit(0 if result else 1)
elif args.command == 'monitor':
result = setup_monitoring(args.config, args.hostname, args.app, args.metrics_file)
sys.exit(0 if all(r['item'] for r in result) else 1)
elif args.command == 'alerts':
result = setup_alerts(args.config, args.severity)
sys.exit(0 if result else 1)
elif args.command == 'complete':
# Run complete setup
logger.info("Starting complete monitoring setup")
# Add hosts
host_result = run_host_discovery(args.config, args.hosts_file)
if not host_result:
logger.error("Host discovery failed")
sys.exit(1)
# Set up alerts if requested
if args.setup_alerts:
alert_result = setup_alerts(args.config)
if not alert_result:
logger.error("Alert setup failed")
sys.exit(1)
logger.info("Complete monitoring setup finished successfully")
sys.exit(0)
if __name__ == '__main__':
main()
Make scripts executable
Set proper executable permissions for all automation scripts.
chmod +x /home/user/zabbix-scripts/*.py
chmod 644 /home/user/zabbix-scripts/*.json
Verify your setup
Test API connection
Verify that your scripts can successfully connect to the Zabbix API.
cd /home/user/zabbix-scripts
source ../zabbix-automation/bin/activate
python3 -c "from zabbix_api import ZabbixConnection; import json; config = json.load(open('config.json')); conn = ZabbixConnection(config['zabbix']['url'], config['zabbix']['username'], config['zabbix']['password']); print('Connection successful' if conn.connect() else 'Connection failed')"
Test host discovery
Run a test with the sample hosts file to verify host addition functionality.
python3 zabbix_automation.py hosts --bulk-file sample_hosts.json
Test alert setup
Configure automated alerting for high severity events.
python3 zabbix_automation.py alerts --severity High Disaster
Check Zabbix web interface
Verify that hosts, items, and actions were created correctly in the Zabbix web interface.
curl -k https://your-zabbix-server/zabbix
echo "Check Configuration -> Hosts, Items, and Actions sections"
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Authentication failed | Incorrect credentials or URL | Verify config.json credentials and Zabbix URL |
| Permission denied errors | API user lacks necessary permissions | Grant Super Admin role to API user in Zabbix |
| Template not found | Template name mismatch | Check exact template names in Zabbix web interface |
| Host group creation fails | Insufficient permissions | Ensure API user can create host groups |
| Import errors | Missing Python modules | pip install pyzabbix requests in virtual environment |
| SSL certificate errors | Self-signed certificates | Use https://server/zabbix or configure SSL properly |
Automate with systemd timers
Create monitoring service
Set up a systemd service for periodic monitoring tasks.
[Unit]
Description=Zabbix Automation Tasks
After=network.target
[Service]
Type=oneshot
User=zabbix
WorkingDirectory=/home/user/zabbix-scripts
Environment=PATH=/home/user/zabbix-automation/bin
ExecStart=/home/user/zabbix-automation/bin/python3 /home/user/zabbix-scripts/zabbix_automation.py complete --hosts-file sample_hosts.json --setup-alerts
[Install]
WantedBy=multi-user.target
Create timer for automation
Configure systemd timer to run automation tasks periodically.
[Unit]
Description=Run Zabbix automation tasks
Requires=zabbix-automation.service
[Timer]
OnCalendar=daily
RandomizedDelaySec=300
Persistent=true
[Install]
WantedBy=timers.target
Enable automation timer
Enable and start the systemd timer for automated execution.
sudo systemctl daemon-reload
sudo systemctl enable zabbix-automation.timer
sudo systemctl start zabbix-automation.timer
sudo systemctl status zabbix-automation.timer
Integration examples
Integrate with CI/CD pipeline
Add Zabbix monitoring to deployment pipelines for automatic infrastructure monitoring.
zabbix_monitoring:
stage: deploy
script:
- source zabbix-automation/bin/activate
- python3 zabbix-scripts/host_discovery.py --hostname $CI_ENVIRONMENT_NAME-$CI_COMMIT_SHORT_SHA --ip $DEPLOY_IP
- python3 zabbix-scripts/custom_monitoring.py --hostname $CI_ENVIRONMENT_NAME-$CI_COMMIT_SHORT_SHA --app $APPLICATION_NAME
only:
- main
Create Ansible playbook integration
Integrate Zabbix automation with Ansible for infrastructure-as-code deployments.
---
- name: Deploy and monitor with Zabbix
hosts: all
tasks:
- name: Add host to Zabbix
uri:
url: "{{ zabbix_api_url }}"
method: POST
headers:
Content-Type: application/json
body_format: json
body:
jsonrpc: "2.0"
method: "host.create"
params:
host: "{{ inventory_hostname }}"
interfaces:
- type: 1
main: 1
useip: 1
ip: "{{ ansible_default_ipv4.address }}"
port: "10050"
groups:
- groupid: "{{ zabbix_group_id }}"
auth: "{{ zabbix_auth_token }}"
id: 1
delegate_to: localhost
Next steps
- Configure Zabbix SNMP monitoring for network devices with automated discovery and templates
- Set up Zabbix 7 high availability cluster with PostgreSQL replication and automated failover
- Integrate Zabbix 7 with network automation and orchestration tools using Ansible and Python APIs
- Configure Zabbix custom dashboards and automated reporting for executive monitoring
- Set up Zabbix distributed monitoring architecture with proxies and scalable data collection
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'
# Configuration
INSTALL_DIR="/opt/zabbix-automation"
SERVICE_USER="zabbix-api"
VENV_NAME="zabbix-automation"
CONFIG_FILE="$INSTALL_DIR/config.json"
# Default values
ZABBIX_URL=""
ZABBIX_USER=""
ZABBIX_PASS=""
usage() {
echo "Usage: $0 --url <zabbix-url> --user <username> --password <password>"
echo "Example: $0 --url http://zabbix.example.com/zabbix --user admin --password password"
exit 1
}
cleanup() {
echo -e "${RED}[ERROR] Installation failed. Cleaning up...${NC}"
if id "$SERVICE_USER" &>/dev/null; then
userdel -r "$SERVICE_USER" 2>/dev/null || true
fi
rm -rf "$INSTALL_DIR" 2>/dev/null || true
}
trap cleanup ERR
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
check_root() {
if [[ $EUID -ne 0 ]]; then
log_error "This script must be run as root"
exit 1
fi
}
detect_distro() {
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update"
PKG_INSTALL="apt install -y"
PYTHON_PKG="python3 python3-pip python3-venv"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
PYTHON_PKG="python3 python3-pip"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
PYTHON_PKG="python3 python3-pip"
;;
*)
log_error "Unsupported distro: $ID"
exit 1
;;
esac
else
log_error "Cannot detect distribution"
exit 1
fi
log_info "Detected distribution: $ID"
}
parse_args() {
while [[ $# -gt 0 ]]; do
case $1 in
--url)
ZABBIX_URL="$2"
shift 2
;;
--user)
ZABBIX_USER="$2"
shift 2
;;
--password)
ZABBIX_PASS="$2"
shift 2
;;
-h|--help)
usage
;;
*)
log_error "Unknown option: $1"
usage
;;
esac
done
if [[ -z "$ZABBIX_URL" || -z "$ZABBIX_USER" || -z "$ZABBIX_PASS" ]]; then
log_error "All parameters are required"
usage
fi
}
install_dependencies() {
echo "[1/8] Installing system dependencies..."
$PKG_UPDATE
$PKG_INSTALL $PYTHON_PKG curl
log_info "System dependencies installed"
}
create_user() {
echo "[2/8] Creating service user..."
if ! id "$SERVICE_USER" &>/dev/null; then
useradd -r -s /bin/false -d "$INSTALL_DIR" -c "Zabbix API Automation" "$SERVICE_USER"
log_info "Created user: $SERVICE_USER"
else
log_warn "User $SERVICE_USER already exists"
fi
}
setup_directory() {
echo "[3/8] Setting up installation directory..."
mkdir -p "$INSTALL_DIR"
chown "$SERVICE_USER:$SERVICE_USER" "$INSTALL_DIR"
chmod 755 "$INSTALL_DIR"
log_info "Created directory: $INSTALL_DIR"
}
setup_python_env() {
echo "[4/8] Setting up Python virtual environment..."
cd "$INSTALL_DIR"
sudo -u "$SERVICE_USER" python3 -m venv "$VENV_NAME"
sudo -u "$SERVICE_USER" "$INSTALL_DIR/$VENV_NAME/bin/pip" install --upgrade pip
sudo -u "$SERVICE_USER" "$INSTALL_DIR/$VENV_NAME/bin/pip" install pyzabbix requests
log_info "Python virtual environment created and packages installed"
}
create_api_script() {
echo "[5/8] Creating Zabbix API connection script..."
cat > "$INSTALL_DIR/zabbix_api.py" << 'EOF'
#!/usr/bin/env python3
import json
from pyzabbix import ZabbixAPI
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ZabbixConnection:
def __init__(self, url, username, password):
self.url = url
self.username = username
self.password = password
self.api = None
def connect(self):
"""Establish connection to Zabbix API"""
try:
self.api = ZabbixAPI(self.url)
self.api.login(self.username, self.password)
logger.info(f"Connected to Zabbix API: {self.url}")
return True
except Exception as e:
logger.error(f"Failed to connect to Zabbix API: {e}")
return False
def disconnect(self):
"""Close API connection"""
if self.api:
self.api.user.logout()
logger.info("Disconnected from Zabbix API")
def get_api(self):
"""Return the API object"""
return self.api
EOF
chown "$SERVICE_USER:$SERVICE_USER" "$INSTALL_DIR/zabbix_api.py"
chmod 644 "$INSTALL_DIR/zabbix_api.py"
log_info "Created Zabbix API connection script"
}
create_config_file() {
echo "[6/8] Creating configuration file..."
cat > "$CONFIG_FILE" << EOF
{
"zabbix": {
"url": "$ZABBIX_URL",
"username": "$ZABBIX_USER",
"password": "$ZABBIX_PASS"
},
"monitoring": {
"default_group": "Linux servers",
"default_templates": [
"Linux by Zabbix agent",
"Linux filesystems by Zabbix agent"
]
},
"alerts": {
"email_media_type": "Email",
"severity_threshold": 3
}
}
EOF
chown "$SERVICE_USER:$SERVICE_USER" "$CONFIG_FILE"
chmod 600 "$CONFIG_FILE"
log_info "Created configuration file with secure permissions"
}
create_host_manager() {
echo "[7/8] Creating host management script..."
cat > "$INSTALL_DIR/host_manager.py" << 'EOF'
#!/usr/bin/env python3
import json
import sys
import os
import argparse
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from zabbix_api import ZabbixConnection
import logging
logger = logging.getLogger(__name__)
class HostManager:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.zabbix = ZabbixConnection(
self.config['zabbix']['url'],
self.config['zabbix']['username'],
self.config['zabbix']['password']
)
def add_host(self, hostname, ip_address, groups=None, templates=None, port=10050):
"""Add a new host to Zabbix monitoring"""
if not self.zabbix.connect():
return False
api = self.zabbix.get_api()
try:
# Check if host already exists
existing_hosts = api.host.get(filter={"host": hostname})
if existing_hosts:
logger.warning(f"Host {hostname} already exists")
return True
# Get default groups if not specified
if not groups:
groups = [self.config['monitoring']['default_group']]
# Get group IDs
group_ids = []
for group_name in groups:
group_result = api.hostgroup.get(filter={"name": group_name})
if group_result:
group_ids.append({"groupid": group_result[0]['groupid']})
else:
logger.error(f"Group '{group_name}' not found")
return False
# Get template IDs
template_ids = []
if not templates:
templates = self.config['monitoring']['default_templates']
for template_name in templates:
template_result = api.template.get(filter={"host": template_name})
if template_result:
template_ids.append({"templateid": template_result[0]['templateid']})
else:
logger.warning(f"Template '{template_name}' not found, skipping")
# Create host
host_data = {
"host": hostname,
"interfaces": [{
"type": 1,
"main": 1,
"useip": 1,
"ip": ip_address,
"dns": "",
"port": str(port)
}],
"groups": group_ids,
"templates": template_ids
}
result = api.host.create(host_data)
logger.info(f"Successfully added host {hostname} with ID: {result['hostids'][0]}")
return True
except Exception as e:
logger.error(f"Failed to add host {hostname}: {e}")
return False
finally:
self.zabbix.disconnect()
def main():
parser = argparse.ArgumentParser(description='Manage Zabbix hosts')
parser.add_argument('--hostname', required=True, help='Host name')
parser.add_argument('--ip', required=True, help='IP address')
parser.add_argument('--config', default='/opt/zabbix-automation/config.json', help='Config file path')
args = parser.parse_args()
manager = HostManager(args.config)
success = manager.add_host(args.hostname, args.ip)
if success:
print(f"Successfully processed host: {args.hostname}")
sys.exit(0)
else:
print(f"Failed to process host: {args.hostname}")
sys.exit(1)
if __name__ == "__main__":
main()
EOF
chown "$SERVICE_USER:$SERVICE_USER" "$INSTALL_DIR/host_manager.py"
chmod 755 "$INSTALL_DIR/host_manager.py"
# Create wrapper script
cat > "$INSTALL_DIR/zabbix-host-manager" << EOF
#!/bin/bash
cd "$INSTALL_DIR"
exec ./$VENV_NAME/bin/python host_manager.py "\$@"
EOF
chown "$SERVICE_USER:$SERVICE_USER" "$INSTALL_DIR/zabbix-host-manager"
chmod 755 "$INSTALL_DIR/zabbix-host-manager"
ln -sf "$INSTALL_DIR/zabbix-host-manager" /usr/local/bin/zabbix-host-manager
log_info "Created host management script"
}
verify_installation() {
echo "[8/8] Verifying installation..."
# Check Python environment
if sudo -u "$SERVICE_USER" "$INSTALL_DIR/$VENV_NAME/bin/python" -c "import pyzabbix, requests" 2>/dev/null; then
log_info "Python dependencies verified"
else
log_error "Python dependencies verification failed"
exit 1
fi
# Check file permissions
if [[ "$(stat -c %a "$CONFIG_FILE")" == "600" ]]; then
log_info "Configuration file permissions verified"
else
log_error "Configuration file permissions incorrect"
exit 1
fi
# Test API connection
if sudo -u "$SERVICE_USER" "$INSTALL_DIR/$VENV_NAME/bin/python" -c "
import sys
sys.path.append('/opt/zabbix-automation')
from zabbix_api import ZabbixConnection
import json
with open('$CONFIG_FILE', 'r') as f:
config = json.load(f)
zabbix = ZabbixConnection(
config['zabbix']['url'],
config['zabbix']['username'],
config['zabbix']['password']
)
if zabbix.connect():
print('API connection successful')
zabbix.disconnect()
exit(0)
else:
exit(1)
" 2>/dev/null; then
log_info "Zabbix API connection verified"
else
log_warn "Could not verify API connection - check your credentials and URL"
fi
echo -e "${GREEN}Installation completed successfully!${NC}"
echo "Usage examples:"
echo " zabbix-host-manager --hostname server01 --ip 192.168.1.100"
echo " sudo -u $SERVICE_USER $INSTALL_DIR/$VENV_NAME/bin/python $INSTALL_DIR/host_manager.py --hostname server01 --ip 192.168.1.100"
}
main() {
check_root
detect_distro
parse_args "$@"
install_dependencies
create_user
setup_directory
setup_python_env
create_api_script
create_config_file
create_host_manager
verify_installation
}
main "$@"
Review the script before running. Execute with: bash install.sh