Build an automated network discovery system that uses SNMP and LLDP protocols to map your infrastructure topology, detect device relationships, and create visual network diagrams with real-time monitoring integration.
Prerequisites
- Root access to server
- Network devices with SNMP enabled
- Python 3.8+
- Network connectivity to target devices
What this solves
Network topology discovery helps you automatically map your infrastructure by scanning devices, detecting their relationships through SNMP and LLDP protocols, and creating visual representations of your network. This approach eliminates manual network documentation, reduces configuration drift detection time, and provides real-time visibility into your infrastructure changes.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you have the latest security patches and package versions.
sudo apt update && sudo apt upgrade -y
Install SNMP tools and LLDP utilities
Install the core SNMP utilities, LLDP daemon, and network scanning tools needed for device discovery and protocol communication.
sudo apt install -y snmp snmp-mibs-downloader lldpd nmap python3-pip python3-venv git
Configure SNMP daemon
Set up the SNMP daemon with proper community strings and access controls for network device communication.
# Community string configuration
rocommunity public 127.0.0.1
rocommunity netdiscovery 10.0.0.0/8
rocommunity netdiscovery 172.16.0.0/12
rocommunity netdiscovery 192.168.0.0/16
System information
sysLocation "Network Operations Center"
sysContact "admin@example.com"
sysServices 72
Access control
com2sec readonly default netdiscovery
group MyROGroup v1 readonly
group MyROGroup v2c readonly
OID access restrictions
view all included .1.3.6.1.2.1.1
view all included .1.3.6.1.2.1.2
view all included .1.3.6.1.2.1.4
view all included .1.3.6.1.4.1
access MyROGroup "" any noauth exact all none none
Disable SNMP v1/v2c write access
rwcommunity private 127.0.0.1
Configure LLDP daemon
Enable LLDP on network interfaces to discover directly connected devices and their capabilities.
# Configure LLDP daemon
configure lldp tx-interval 30
configure lldp tx-hold 4
configure system hostname discovery-server
configure system description "Network Discovery Server"
configure system platform "Linux"
Enable CDP compatibility for Cisco devices
configure lldp custom-tlv oui 00,00,0c subtype 1
Interface configuration
configure ports eth0 lldp portidsubtype local eth0
configure med fast-start enable
Enable and start services
Start the SNMP and LLDP daemons and enable them to start automatically on boot.
sudo systemctl enable --now snmpd
sudo systemctl enable --now lldpd
sudo systemctl status snmpd lldpd
Create Python virtual environment
Set up an isolated Python environment for the network discovery scripts and required libraries.
sudo mkdir -p /opt/network-discovery
sudo chown $(whoami):$(whoami) /opt/network-discovery
cd /opt/network-discovery
python3 -m venv venv
source venv/bin/activate
pip install pysnmp pysnmp-mibs netaddr netifaces graphviz networkx matplotlib
Create network scanner script
Build the core network discovery script that performs SNMP walks and LLDP neighbor detection across your network ranges.
#!/usr/bin/env python3
import json
import subprocess
import socket
from ipaddress import IPv4Network, AddressValueError
from pysnmp.hlapi import *
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class NetworkScanner:
def __init__(self, community='public', timeout=5, retries=1):
self.community = community
self.timeout = timeout
self.retries = retries
self.discovered_devices = {}
def snmp_get(self, target, oid):
"""Perform SNMP GET operation"""
try:
for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(
SnmpEngine(),
CommunityData(self.community),
UdpTransportTarget((target, 161), timeout=self.timeout, retries=self.retries),
ContextData(),
ObjectType(ObjectIdentity(oid)),
lexicographicMode=False,
maxRows=50):
if errorIndication:
break
elif errorStatus:
break
else:
return [str(varBind[1]) for varBind in varBinds]
return []
except Exception as e:
logger.debug(f"SNMP error for {target}: {e}")
return []
def get_device_info(self, ip):
"""Extract device information via SNMP"""
device_info = {
'ip': ip,
'hostname': '',
'system_desc': '',
'uptime': '',
'interfaces': [],
'neighbors': [],
'vendor': '',
'model': ''
}
# System information OIDs
hostname_result = self.snmp_get(ip, '1.3.6.1.2.1.1.5.0') # sysName
if hostname_result:
device_info['hostname'] = hostname_result[0]
desc_result = self.snmp_get(ip, '1.3.6.1.2.1.1.1.0') # sysDescr
if desc_result:
device_info['system_desc'] = desc_result[0]
# Parse vendor info from description
desc_lower = desc_result[0].lower()
if 'cisco' in desc_lower:
device_info['vendor'] = 'Cisco'
elif 'juniper' in desc_lower:
device_info['vendor'] = 'Juniper'
elif 'hp' in desc_lower or 'hewlett' in desc_lower:
device_info['vendor'] = 'HP'
elif 'dell' in desc_lower:
device_info['vendor'] = 'Dell'
uptime_result = self.snmp_get(ip, '1.3.6.1.2.1.1.3.0') # sysUpTime
if uptime_result:
device_info['uptime'] = uptime_result[0]
# Interface information
interfaces = self.snmp_get(ip, '1.3.6.1.2.1.2.2.1.2') # ifDescr
if interfaces:
device_info['interfaces'] = interfaces[:10] # Limit to first 10
# LLDP neighbors (if supported)
lldp_neighbors = self.snmp_get(ip, '1.0.8802.1.1.2.1.4.1.1.9') # lldpRemSysName
if lldp_neighbors:
device_info['neighbors'] = lldp_neighbors
return device_info
def scan_network_range(self, network_range, max_workers=50):
"""Scan a network range for SNMP-enabled devices"""
logger.info(f"Scanning network range: {network_range}")
try:
network = IPv4Network(network_range, strict=False)
except AddressValueError:
logger.error(f"Invalid network range: {network_range}")
return
# Skip network and broadcast addresses for /24 and smaller
if network.prefixlen >= 24:
hosts = list(network.hosts())
else:
hosts = list(network)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_ip = {executor.submit(self.check_snmp_device, str(ip)): ip for ip in hosts}
for future in as_completed(future_to_ip, timeout=300):
ip = future_to_ip[future]
try:
result = future.result()
if result:
self.discovered_devices[str(ip)] = result
logger.info(f"Discovered device: {result['hostname']} ({ip})")
except Exception as e:
logger.debug(f"Error scanning {ip}: {e}")
def check_snmp_device(self, ip):
"""Check if device responds to SNMP and gather info"""
# Quick port check first
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(2)
sock.connect((ip, 161))
sock.close()
except:
return None
# Try SNMP
device_info = self.get_device_info(ip)
if device_info['hostname'] or device_info['system_desc']:
return device_info
return None
def get_local_lldp_neighbors(self):
"""Get LLDP neighbors from local daemon"""
try:
result = subprocess.run(['lldpctl', '-f', 'json'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
return json.loads(result.stdout)
except Exception as e:
logger.error(f"Error getting LLDP neighbors: {e}")
return {}
def generate_topology_map(self):
"""Generate network topology relationships"""
topology = {
'devices': self.discovered_devices,
'connections': [],
'generated_at': time.strftime('%Y-%m-%d %H:%M:%S')
}
# Add LLDP-discovered connections
for device_ip, device_info in self.discovered_devices.items():
for neighbor in device_info.get('neighbors', []):
# Find neighbor device by hostname
for neighbor_ip, neighbor_info in self.discovered_devices.items():
if neighbor_info.get('hostname', '').lower() == neighbor.lower():
connection = {
'source': device_ip,
'target': neighbor_ip,
'type': 'lldp',
'source_name': device_info.get('hostname', device_ip),
'target_name': neighbor_info.get('hostname', neighbor_ip)
}
if connection not in topology['connections']:
topology['connections'].append(connection)
return topology
def save_results(self, filename='network_topology.json'):
"""Save discovery results to JSON file"""
topology = self.generate_topology_map()
with open(filename, 'w') as f:
json.dump(topology, f, indent=2)
logger.info(f"Results saved to {filename}")
return topology
if __name__ == "__main__":
scanner = NetworkScanner(community='netdiscovery')
# Define networks to scan
networks = [
'192.168.1.0/24',
'10.0.1.0/24'
]
for network in networks:
scanner.scan_network_range(network)
# Save results
topology = scanner.save_results('/opt/network-discovery/topology.json')
print(f"Discovered {len(topology['devices'])} devices")
print(f"Found {len(topology['connections'])} connections")
Create topology visualizer
Build a script that creates visual network diagrams from the discovered topology data using NetworkX and Matplotlib.
#!/usr/bin/env python3
import json
import networkx as nx
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from datetime import datetime
import argparse
class TopologyVisualizer:
def __init__(self):
self.graph = nx.Graph()
self.vendor_colors = {
'Cisco': '#1BA0D7',
'Juniper': '#84BD00',
'HP': '#0096D6',
'Dell': '#007DB8',
'Unknown': '#666666'
}
def load_topology(self, filename):
"""Load topology data from JSON file"""
try:
with open(filename, 'r') as f:
return json.load(f)
except FileNotFoundError:
print(f"Error: Topology file {filename} not found")
return None
except json.JSONDecodeError:
print(f"Error: Invalid JSON in {filename}")
return None
def build_graph(self, topology_data):
"""Build NetworkX graph from topology data"""
devices = topology_data.get('devices', {})
connections = topology_data.get('connections', [])
# Add nodes
for ip, device_info in devices.items():
hostname = device_info.get('hostname', ip)
vendor = device_info.get('vendor', 'Unknown')
interfaces = len(device_info.get('interfaces', []))
self.graph.add_node(ip,
hostname=hostname,
vendor=vendor,
interfaces=interfaces,
system_desc=device_info.get('system_desc', '')[:50])
# Add edges
for connection in connections:
source = connection['source']
target = connection['target']
if source in self.graph and target in self.graph:
self.graph.add_edge(source, target,
connection_type=connection.get('type', 'unknown'))
def create_visualization(self, output_file='network_topology.png', layout='spring'):
"""Create network topology visualization"""
if len(self.graph.nodes()) == 0:
print("No devices to visualize")
return
# Set up the plot
plt.figure(figsize=(16, 12))
ax = plt.gca()
# Choose layout algorithm
if layout == 'spring':
pos = nx.spring_layout(self.graph, k=3, iterations=50)
elif layout == 'circular':
pos = nx.circular_layout(self.graph)
elif layout == 'kamada_kawai':
pos = nx.kamada_kawai_layout(self.graph)
else:
pos = nx.spring_layout(self.graph)
# Draw edges
nx.draw_networkx_edges(self.graph, pos,
edge_color='#CCCCCC',
width=2,
alpha=0.7)
# Draw nodes by vendor
for vendor, color in self.vendor_colors.items():
vendor_nodes = [n for n, d in self.graph.nodes(data=True)
if d.get('vendor', 'Unknown') == vendor]
if vendor_nodes:
nx.draw_networkx_nodes(self.graph, pos,
nodelist=vendor_nodes,
node_color=color,
node_size=800,
alpha=0.8)
# Add labels
labels = {}
for node, data in self.graph.nodes(data=True):
hostname = data.get('hostname', node)
if len(hostname) > 15:
hostname = hostname[:12] + '...'
labels[node] = f"{hostname}\n({node})"
nx.draw_networkx_labels(self.graph, pos, labels, font_size=8, font_weight='bold')
# Create legend
legend_elements = []
for vendor, color in self.vendor_colors.items():
vendor_nodes = [n for n, d in self.graph.nodes(data=True)
if d.get('vendor', 'Unknown') == vendor]
if vendor_nodes:
legend_elements.append(patches.Patch(color=color, label=f'{vendor} ({len(vendor_nodes)})'))
plt.legend(handles=legend_elements, loc='upper left', bbox_to_anchor=(0, 1))
# Add title and info
plt.title(f"Network Topology Discovery\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Devices: {len(self.graph.nodes())} | Connections: {len(self.graph.edges())}",
fontsize=14, pad=20)
# Remove axis
ax.set_axis_off()
# Save the plot
plt.tight_layout()
plt.savefig(output_file, dpi=300, bbox_inches='tight',
facecolor='white', edgecolor='none')
plt.close()
print(f"Network topology visualization saved to {output_file}")
def generate_report(self, topology_data, output_file='network_report.txt'):
"""Generate text report of discovered network"""
devices = topology_data.get('devices', {})
connections = topology_data.get('connections', [])
report = []
report.append("NETWORK TOPOLOGY DISCOVERY REPORT")
report.append("=" * 50)
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Total Devices: {len(devices)}")
report.append(f"Total Connections: {len(connections)}")
report.append("")
# Device summary by vendor
vendor_count = {}
for device_info in devices.values():
vendor = device_info.get('vendor', 'Unknown')
vendor_count[vendor] = vendor_count.get(vendor, 0) + 1
report.append("DEVICES BY VENDOR:")
report.append("-" * 20)
for vendor, count in sorted(vendor_count.items()):
report.append(f"{vendor}: {count}")
report.append("")
# Detailed device list
report.append("DEVICE DETAILS:")
report.append("-" * 15)
for ip, device_info in sorted(devices.items()):
hostname = device_info.get('hostname', 'Unknown')
vendor = device_info.get('vendor', 'Unknown')
interfaces = len(device_info.get('interfaces', []))
neighbors = len(device_info.get('neighbors', []))
report.append(f"IP: {ip}")
report.append(f" Hostname: {hostname}")
report.append(f" Vendor: {vendor}")
report.append(f" Interfaces: {interfaces}")
report.append(f" LLDP Neighbors: {neighbors}")
report.append(f" Description: {device_info.get('system_desc', 'N/A')[:100]}")
report.append("")
# Connection details
if connections:
report.append("NETWORK CONNECTIONS:")
report.append("-" * 20)
for conn in connections:
report.append(f"{conn.get('source_name', conn['source'])} <-> "
f"{conn.get('target_name', conn['target'])} ({conn.get('type', 'unknown')})")
# Write report
with open(output_file, 'w') as f:
f.write('\n'.join(report))
print(f"Network report saved to {output_file}")
def main():
parser = argparse.ArgumentParser(description='Network Topology Visualizer')
parser.add_argument('-i', '--input', default='topology.json',
help='Input topology JSON file')
parser.add_argument('-o', '--output', default='network_topology.png',
help='Output visualization file')
parser.add_argument('-r', '--report', default='network_report.txt',
help='Output report file')
parser.add_argument('-l', '--layout', choices=['spring', 'circular', 'kamada_kawai'],
default='spring', help='Graph layout algorithm')
args = parser.parse_args()
visualizer = TopologyVisualizer()
topology_data = visualizer.load_topology(args.input)
if topology_data:
visualizer.build_graph(topology_data)
visualizer.create_visualization(args.output, args.layout)
visualizer.generate_report(topology_data, args.report)
else:
print("Failed to load topology data")
if __name__ == "__main__":
main()
Create automated discovery service
Set up a systemd service and timer for automated periodic network discovery.
#!/usr/bin/env python3
import os
import sys
import logging
import yaml
from pathlib import Path
Add the virtual environment to Python path
sys.path.insert(0, '/opt/network-discovery/venv/lib/python3.*/site-packages')
from network_scanner import NetworkScanner
from topology_visualizer import TopologyVisualizer
Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/network-discovery.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class DiscoveryService:
def __init__(self, config_file='/opt/network-discovery/config.yaml'):
self.config = self.load_config(config_file)
self.base_dir = Path('/opt/network-discovery')
def load_config(self, config_file):
"""Load configuration from YAML file"""
default_config = {
'networks': ['192.168.1.0/24', '10.0.1.0/24'],
'snmp_community': 'netdiscovery',
'output_dir': '/opt/network-discovery/results',
'max_workers': 50,
'generate_visualization': True,
'generate_report': True
}
try:
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
# Merge with defaults
default_config.update(config)
return default_config
except FileNotFoundError:
logger.warning(f"Config file {config_file} not found, using defaults")
return default_config
except Exception as e:
logger.error(f"Error loading config: {e}")
return default_config
def run_discovery(self):
"""Run network discovery process"""
logger.info("Starting network topology discovery")
# Create output directory
output_dir = Path(self.config['output_dir'])
output_dir.mkdir(parents=True, exist_ok=True)
# Initialize scanner
scanner = NetworkScanner(
community=self.config['snmp_community'],
timeout=5,
retries=1
)
# Scan networks
for network in self.config['networks']:
logger.info(f"Scanning network: {network}")
scanner.scan_network_range(network, self.config['max_workers'])
# Save results
timestamp = time.strftime('%Y%m%d_%H%M%S')
topology_file = output_dir / f'topology_{timestamp}.json'
topology_data = scanner.save_results(str(topology_file))
# Create symlink to latest
latest_link = output_dir / 'topology_latest.json'
if latest_link.exists():
latest_link.unlink()
latest_link.symlink_to(topology_file.name)
logger.info(f"Discovery complete: {len(topology_data['devices'])} devices found")
# Generate visualization if enabled
if self.config.get('generate_visualization', False):
self.generate_visualization(topology_data, output_dir, timestamp)
# Generate report if enabled
if self.config.get('generate_report', False):
self.generate_report(topology_data, output_dir, timestamp)
return topology_data
def generate_visualization(self, topology_data, output_dir, timestamp):
"""Generate network visualization"""
try:
visualizer = TopologyVisualizer()
visualizer.build_graph(topology_data)
viz_file = output_dir / f'topology_{timestamp}.png'
visualizer.create_visualization(str(viz_file))
# Create symlink to latest
latest_link = output_dir / 'topology_latest.png'
if latest_link.exists():
latest_link.unlink()
latest_link.symlink_to(viz_file.name)
logger.info(f"Visualization generated: {viz_file}")
except Exception as e:
logger.error(f"Error generating visualization: {e}")
def generate_report(self, topology_data, output_dir, timestamp):
"""Generate network report"""
try:
visualizer = TopologyVisualizer()
report_file = output_dir / f'report_{timestamp}.txt'
visualizer.generate_report(topology_data, str(report_file))
# Create symlink to latest
latest_link = output_dir / 'report_latest.txt'
if latest_link.exists():
latest_link.unlink()
latest_link.symlink_to(report_file.name)
logger.info(f"Report generated: {report_file}")
except Exception as e:
logger.error(f"Error generating report: {e}")
if __name__ == "__main__":
import time
service = DiscoveryService()
try:
topology_data = service.run_discovery()
logger.info("Network discovery service completed successfully")
except Exception as e:
logger.error(f"Discovery service failed: {e}")
sys.exit(1)
Create configuration file
Set up the YAML configuration file with your network ranges and discovery settings.
# Network Discovery Configuration
Networks to scan (CIDR notation)
networks:
- "192.168.1.0/24"
- "10.0.1.0/24"
- "172.16.1.0/24"
SNMP settings
snmp_community: "netdiscovery"
snmp_timeout: 5
snmp_retries: 1
Discovery settings
max_workers: 50
scan_interval_hours: 6
Output settings
output_dir: "/opt/network-discovery/results"
generate_visualization: true
generate_report: true
keep_history_days: 30
Integration settings
prometheus_pushgateway: null
grafana_webhook: null
slack_webhook: null
Device filters (optional)
include_vendors: []
exclude_vendors: []
include_device_types: []
exclude_device_types: []
Set up systemd service
Create a systemd service unit for the network discovery daemon.
[Unit]
Description=Network Topology Discovery Service
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
User=root
Group=root
WorkingDirectory=/opt/network-discovery
Environment=PATH=/opt/network-discovery/venv/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
ExecStart=/opt/network-discovery/venv/bin/python /opt/network-discovery/discovery_service.py
StandardOutput=journal
StandardError=journal
SyslogIdentifier=network-discovery
[Install]
WantedBy=multi-user.target
Create systemd timer
Set up a timer to run network discovery automatically every 6 hours.
[Unit]
Description=Network Discovery Timer
Requires=network-discovery.service
[Timer]
Run every 6 hours
OnCalendar=--* 00,06,12,18:00:00
Run 5 minutes after boot if we missed the last scheduled run
OnBootSec=5min
If the system was suspended/off, run immediately when it comes back
Persistent=true
[Install]
WantedBy=timers.target
Set proper permissions
Configure file ownership and permissions for the discovery service files and directories.
# Set ownership for discovery directory
sudo chown -R root:root /opt/network-discovery
Set executable permissions for scripts
sudo chmod 755 /opt/network-discovery/*.py
Set read/write for config and data files
sudo chmod 644 /opt/network-discovery/config.yaml
Create results directory with proper permissions
sudo mkdir -p /opt/network-discovery/results
sudo chmod 755 /opt/network-discovery/results
Set log file permissions
sudo touch /var/log/network-discovery.log
sudo chmod 644 /var/log/network-discovery.log
Enable and start the service
Enable the systemd timer and run an initial discovery to test the setup.
# Reload systemd daemon
sudo systemctl daemon-reload
Enable and start the timer
sudo systemctl enable --now network-discovery.timer
Check timer status
sudo systemctl status network-discovery.timer
Run initial discovery manually
sudo systemctl start network-discovery.service
Check service status
sudo systemctl status network-discovery.service
Configure firewall rules
Open necessary ports for SNMP and LLDP communication in your firewall.
# Allow SNMP (UDP 161) for discovery queries
sudo ufw allow 161/udp comment 'SNMP discovery'
Allow LLDP multicast traffic
sudo ufw allow from any to 224.0.0.0/8
Check firewall status
sudo ufw status numbered
Integration with monitoring systems
Create Prometheus metrics exporter
Build a Prometheus exporter to make discovery metrics available for monitoring dashboards. This integrates with existing Prometheus and Grafana monitoring setups.
#!/usr/bin/env python3
import json
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
import threading
import logging
logger = logging.getLogger(__name__)
class NetworkDiscoveryExporter:
def __init__(self, topology_file='/opt/network-discovery/results/topology_latest.json'):
self.topology_file = topology_file
self.metrics = {}
self.last_update = 0
def load_topology_data(self):
"""Load latest topology data"""
try:
topology_path = Path(self.topology_file)
if not topology_path.exists():
return {}
# Check if file was updated
file_mtime = topology_path.stat().st_mtime
if file_mtime <= self.last_update:
return self.metrics
with open(topology_path, 'r') as f:
topology_data = json.load(f)
self.last_update = file_mtime
self.generate_metrics(topology_data)
return self.metrics
except Exception as e:
logger.error(f"Error loading topology data: {e}")
return {}
def generate_metrics(self, topology_data):
"""Generate Prometheus metrics from topology data"""
devices = topology_data.get('devices', {})
connections = topology_data.get('connections', [])
# Count devices by vendor
vendor_counts = {}
interface_counts = {}
for ip, device_info in devices.items():
vendor = device_info.get('vendor', 'Unknown')
vendor_counts[vendor] = vendor_counts.get(vendor, 0) + 1
# Interface count per device
interface_count = len(device_info.get('interfaces', []))
interface_counts[ip] = interface_count
# Build metrics
self.metrics = {
'network_discovery_devices_total': len(devices),
'network_discovery_connections_total': len(connections),
'network_discovery_vendors': vendor_counts,
'network_discovery_interfaces': interface_counts,
'network_discovery_last_scan': int(time.time())
}
def format_prometheus_metrics(self):
"""Format metrics for Prometheus"""
self.load_topology_data()
output = []
# Total devices
output.append('# HELP network_discovery_devices_total Total number of discovered devices')
output.append('# TYPE network_discovery_devices_total gauge')
output.append(f'network_discovery_devices_total {self.metrics.get("network_discovery_devices_total", 0)}')
output.append('')
# Total connections
output.append('# HELP network_discovery_connections_total Total number of network connections')
output.append('# TYPE network_discovery_connections_total gauge')
output.append(f'network_discovery_connections_total {self.metrics.get("network_discovery_connections_total", 0)}')
output.append('')
# Devices by vendor
output.append('# HELP network_discovery_devices_by_vendor Devices grouped by vendor')
output.append('# TYPE network_discovery_devices_by_vendor gauge')
for vendor, count in self.metrics.get('network_discovery_vendors', {}).items():
output.append(f'network_discovery_devices_by_vendor{{vendor="{vendor}"}} {count}')
output.append('')
# Last scan timestamp
output.append('# HELP network_discovery_last_scan_timestamp Unix timestamp of last scan')
output.append('# TYPE network_discovery_last_scan_timestamp gauge')
output.append(f'network_discovery_last_scan_timestamp {self.metrics.get("network_discovery_last_scan", 0)}')
output.append('')
return '\n'.join(output)
class PrometheusHandler(BaseHTTPRequestHandler):
def __init__(self, *args, exporter=None, **kwargs):
self.exporter = exporter
super().__init__(*args, **kwargs)
def do_GET(self):
if self.path == '/metrics':
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.end_headers()
metrics = self.exporter.format_prometheus_metrics()
self.wfile.write(metrics.encode('utf-8'))
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
# Suppress default HTTP logging
pass
def run_prometheus_exporter(port=9090):
"""Run Prometheus metrics exporter"""
exporter = NetworkDiscoveryExporter()
def handler(*args, **kwargs):
PrometheusHandler(*args, exporter=exporter, **kwargs)
server = HTTPServer(('0.0.0.0', port), handler)
logger.info(f"Network Discovery Prometheus exporter running on port {port}")
server.serve_forever()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
run_prometheus_exporter()
Create Grafana dashboard configuration
Set up a JSON dashboard configuration that can be imported into Grafana for network topology visualization.
{
"dashboard": {
"id": null,
"title": "Network Topology Discovery",
"tags": ["network", "topology", "snmp", "lldp"],
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "Total Discovered Devices",
"type": "stat",
"targets": [
{
"expr": "network_discovery_devices_total",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"color": {"mode": "palette-classic"},
"unit": "short"
}
},
"gridPos": {"h": 8, "w": 6, "x": 0, "y": 0}
},
{
"id": 2,
"title": "Network Connections",
"type": "stat",
"targets": [
{
"expr": "network_discovery_connections_total",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"color": {"mode": "palette-classic"},
"unit": "short"
}
},
"gridPos": {"h": 8, "w": 6, "x": 6, "y": 0}
},
{
"id": 3,
"title": "Devices by Vendor",
"type": "piechart",
"targets": [
{
"expr": "network_discovery_devices_by_vendor",
"refId": "A"
}
],
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
},
{
"id": 4,
"title": "Discovery Timeline",
"type": "timeseries",
"targets": [
{
"expr": "network_discovery_devices_total",
"refId": "A",
"legendFormat": "Total Devices"
},
{
"expr": "network_discovery_connections_total",
"refId": "B",
"legendFormat": "Connections"
}
],
"gridPos": {"h": 8, "w": 24, "x": 0, "y": 8}
}
],
"time": {
"from": "now-24h",
"to": "now"
},
"refresh": "30s"
}
}
Verify your setup
# Check service status
sudo systemctl status network-discovery.timer
sudo systemctl status network-discovery.service
View recent discovery logs
sudo journalctl -u network-discovery.service -n 50
Check SNMP and LLDP services
sudo systemctl status snmpd lldpd
Test SNMP connectivity to a device
snmpwalk -v2c -c netdiscovery 192.168.1.1 1.3.6.1.2.1.1.5
Check LLDP neighbors
lldpctl
View discovery results
ls -la /opt/network-discovery/results/
cat /opt/network-discovery/results/report_latest.txt
Test Python dependencies
cd /opt/network-discovery
source venv/bin/activate
python -c "import pysnmp, networkx, matplotlib; print('All dependencies OK')"
Check firewall rules
sudo ufw status # Ubuntu/Debian
sudo firewall-cmd --list-all # AlmaLinux/Rocky
You can also access the example monitoring integration by linking this setup with SNMP monitoring using Grafana dashboards for enhanced visualization capabilities.
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| No devices discovered | SNMP community mismatch or firewall blocking | Verify SNMP community string and check sudo ufw status for firewall rules |
| Permission denied errors | Incorrect file ownership or permissions | Run sudo chown -R root:root /opt/network-discovery and sudo chmod 755 /opt/network-discovery/*.py |
| Python import errors | Virtual environment not activated or missing dependencies | Activate venv with source /opt/network-discovery/venv/bin/activate and reinstall packages |
| LLDP neighbors not found | LLDP not enabled on network devices | Enable LLDP on switches and routers, check with lldpctl |
| Visualization generation fails | Missing display backend for matplotlib | Install with sudo apt install python3-tk or set MPLBACKEND=Agg environment variable |
| Service fails to start | Configuration file syntax error | Check YAML syntax in /opt/network-discovery/config.yaml and view logs with journalctl -u network-discovery.service |
Next steps
- Enhance SNMP device auto-discovery with advanced scanning techniques
- Integrate with Nagios for SNMP-based network device monitoring
- Set up automated network topology change alerts with Prometheus and Grafana
- Automate network device configuration backup based on discovery results
- Implement network security vulnerability scanning integrated with topology discovery
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
readonly RED='\033[0;31m'
readonly GREEN='\033[0;32m'
readonly YELLOW='\033[1;33m'
readonly NC='\033[0m' # No Color
# Default configuration
COMMUNITY_STRING="${1:-netdiscovery}"
CONTACT_EMAIL="${2:-admin@example.com}"
LOCATION="${3:-Network Operations Center}"
# Print usage if help requested
if [[ "${1:-}" == "-h" ]] || [[ "${1:-}" == "--help" ]]; then
echo "Usage: $0 [COMMUNITY_STRING] [CONTACT_EMAIL] [LOCATION]"
echo "Example: $0 mynetwork admin@company.com 'Data Center'"
exit 0
fi
# Logging functions
log_info() { echo -e "${GREEN}[INFO]${NC} $1"; }
log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; }
log_error() { echo -e "${RED}[ERROR]${NC} $1"; }
# Cleanup on error
cleanup() {
log_error "Installation failed. Cleaning up..."
if [[ -d /opt/network-discovery ]]; then
rm -rf /opt/network-discovery
fi
systemctl stop snmpd lldpd 2>/dev/null || true
}
trap cleanup ERR
# Check prerequisites
check_prerequisites() {
if [[ $EUID -ne 0 ]]; then
log_error "This script must be run as root"
exit 1
fi
if ! command -v systemctl &> /dev/null; then
log_error "systemd is required"
exit 1
fi
}
# Auto-detect distribution
detect_distro() {
if [[ ! -f /etc/os-release ]]; then
log_error "Cannot detect distribution - /etc/os-release not found"
exit 1
fi
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update && apt upgrade -y"
SNMP_PACKAGES="snmp snmp-mibs-downloader lldpd nmap python3-pip python3-venv git"
SNMP_CONFIG="/etc/snmp/snmpd.conf"
LLDP_CONFIG="/etc/lldpd.d/01-custom.conf"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
SNMP_PACKAGES="net-snmp net-snmp-utils lldpd nmap python3-pip python3-virtualenv git"
SNMP_CONFIG="/etc/snmp/snmpd.conf"
LLDP_CONFIG="/etc/lldpd.d/01-custom.conf"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
SNMP_PACKAGES="net-snmp net-snmp-utils lldpd nmap python3-pip python3-virtualenv git"
SNMP_CONFIG="/etc/snmp/snmpd.conf"
LLDP_CONFIG="/etc/lldpd.d/01-custom.conf"
;;
*)
log_error "Unsupported distribution: $ID"
exit 1
;;
esac
log_info "Detected distribution: $PRETTY_NAME"
}
main() {
log_info "Starting network topology discovery setup..."
echo "[1/8] Checking prerequisites..."
check_prerequisites
detect_distro
echo "[2/8] Updating system packages..."
eval "$PKG_UPDATE"
echo "[3/8] Installing SNMP and LLDP packages..."
eval "$PKG_INSTALL $SNMP_PACKAGES"
echo "[4/8] Configuring SNMP daemon..."
mkdir -p "$(dirname "$SNMP_CONFIG")"
cat > "$SNMP_CONFIG" << EOF
# Community string configuration
rocommunity public 127.0.0.1
rocommunity $COMMUNITY_STRING 10.0.0.0/8
rocommunity $COMMUNITY_STRING 172.16.0.0/12
rocommunity $COMMUNITY_STRING 192.168.0.0/16
# System information
sysLocation "$LOCATION"
sysContact "$CONTACT_EMAIL"
sysServices 72
# Access control
com2sec readonly default $COMMUNITY_STRING
group MyROGroup v1 readonly
group MyROGroup v2c readonly
# OID access restrictions
view all included .1.3.6.1.2.1.1
view all included .1.3.6.1.2.1.2
view all included .1.3.6.1.2.1.4
view all included .1.3.6.1.4.1
access MyROGroup "" any noauth exact all none none
# Disable SNMP v1/v2c write access
rwcommunity private 127.0.0.1
EOF
chmod 644 "$SNMP_CONFIG"
echo "[5/8] Configuring LLDP daemon..."
mkdir -p "$(dirname "$LLDP_CONFIG")"
cat > "$LLDP_CONFIG" << EOF
# Configure LLDP daemon
configure lldp tx-interval 30
configure lldp tx-hold 4
configure system hostname discovery-server
configure system description "Network Discovery Server"
configure system platform "Linux"
# Enable CDP compatibility for Cisco devices
configure lldp custom-tlv oui 00,00,0c subtype 1
# Interface configuration for primary interface
configure med fast-start enable
EOF
chmod 644 "$LLDP_CONFIG"
echo "[6/8] Creating Python virtual environment..."
mkdir -p /opt/network-discovery
cd /opt/network-discovery
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install pysnmp pysnmp-mibs netaddr netifaces graphviz networkx matplotlib
echo "[7/8] Creating network scanner script..."
cat > /opt/network-discovery/network_scanner.py << 'EOF'
#!/usr/bin/env python3
import json
import subprocess
import socket
from ipaddress import IPv4Network, AddressValueError
from pysnmp.hlapi import *
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class NetworkScanner:
def __init__(self, community='netdiscovery', timeout=5, retries=1):
self.community = community
self.timeout = timeout
self.retries = retries
self.discovered_devices = {}
def snmp_get(self, target, oid):
"""Perform SNMP GET operation"""
try:
iterator = getCmd(
SnmpEngine(),
CommunityData(self.community),
UdpTransportTarget((target, 161), timeout=self.timeout, retries=self.retries),
ContextData(),
ObjectType(ObjectIdentity(oid))
)
errorIndication, errorStatus, errorIndex, varBinds = next(iterator)
if errorIndication or errorStatus:
return None
return str(varBinds[0][1])
except Exception:
return None
def scan_device(self, ip):
"""Scan a single device for SNMP information"""
device_info = {'ip': ip, 'reachable': False}
# Check if device responds to SNMP
sysname = self.snmp_get(ip, '1.3.6.1.2.1.1.5.0')
if sysname:
device_info['reachable'] = True
device_info['hostname'] = sysname
device_info['sysdesc'] = self.snmp_get(ip, '1.3.6.1.2.1.1.1.0')
device_info['uptime'] = self.snmp_get(ip, '1.3.6.1.2.1.1.3.0')
return device_info
def scan_network(self, network_range):
"""Scan network range for SNMP-enabled devices"""
try:
network = IPv4Network(network_range, strict=False)
except AddressValueError:
logger.error(f"Invalid network range: {network_range}")
return
logger.info(f"Scanning network range: {network}")
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(self.scan_device, str(ip)): ip for ip in network.hosts()}
for future in as_completed(futures):
result = future.result()
if result['reachable']:
self.discovered_devices[result['ip']] = result
logger.info(f"Found device: {result['ip']} - {result.get('hostname', 'Unknown')}")
def save_results(self, filename='network_topology.json'):
"""Save discovery results to JSON file"""
with open(filename, 'w') as f:
json.dump(self.discovered_devices, f, indent=2)
logger.info(f"Results saved to {filename}")
if __name__ == '__main__':
scanner = NetworkScanner()
# Scan common private network ranges
scanner.scan_network('192.168.1.0/24')
scanner.scan_network('10.0.0.0/24')
scanner.save_results()
EOF
chmod 755 /opt/network-discovery/network_scanner.py
chown -R root:root /opt/network-discovery
# Create systemd service file
cat > /etc/systemd/system/network-discovery.service << EOF
[Unit]
Description=Network Discovery Scanner
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
User=root
WorkingDirectory=/opt/network-discovery
Environment=PATH=/opt/network-discovery/venv/bin
ExecStart=/opt/network-discovery/venv/bin/python /opt/network-discovery/network_scanner.py
[Install]
WantedBy=multi-user.target
EOF
chmod 644 /etc/systemd/system/network-discovery.service
systemctl daemon-reload
echo "[8/8] Starting and enabling services..."
systemctl enable --now snmpd
systemctl enable --now lldpd
# Configure firewall if active
if systemctl is-active --quiet firewalld; then
firewall-cmd --permanent --add-port=161/udp --add-port=162/udp
firewall-cmd --reload
log_info "Configured firewalld for SNMP"
elif systemctl is-active --quiet ufw; then
ufw allow 161/udp
ufw allow 162/udp
log_info "Configured UFW for SNMP"
fi
# Verification
log_info "Verifying installation..."
if systemctl is-active --quiet snmpd && systemctl is-active --quiet lldpd; then
log_info "✓ SNMP and LLDP services are running"
else
log_error "✗ Services failed to start"
exit 1
fi
if [[ -x /opt/network-discovery/network_scanner.py ]]; then
log_info "✓ Network scanner script installed"
else
log_error "✗ Network scanner script missing"
exit 1
fi
log_info "Network topology discovery setup completed successfully!"
log_info "Usage: cd /opt/network-discovery && source venv/bin/activate && python network_scanner.py"
log_info "Or run: systemctl start network-discovery"
}
main "$@"
Review the script before running. Execute with: bash install.sh