Set up automated network topology discovery with SNMP and LLDP for infrastructure mapping

Advanced 45 min Apr 11, 2026 400 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Build an automated network discovery system that uses SNMP and LLDP protocols to map your infrastructure topology, detect device relationships, and create visual network diagrams with real-time monitoring integration.

Prerequisites

  • Root access to server
  • Network devices with SNMP enabled
  • Python 3.8+
  • Network connectivity to target devices

What this solves

Network topology discovery helps you automatically map your infrastructure by scanning devices, detecting their relationships through SNMP and LLDP protocols, and creating visual representations of your network. This approach eliminates manual network documentation, reduces configuration drift detection time, and provides real-time visibility into your infrastructure changes.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you have the latest security patches and package versions.

sudo apt update && sudo apt upgrade -y
sudo dnf update -y

Install SNMP tools and LLDP utilities

Install the core SNMP utilities, LLDP daemon, and network scanning tools needed for device discovery and protocol communication.

sudo apt install -y snmp snmp-mibs-downloader lldpd nmap python3-pip python3-venv git
sudo dnf install -y net-snmp net-snmp-utils lldpd nmap python3-pip python3-virtualenv git

Configure SNMP daemon

Set up the SNMP daemon with proper community strings and access controls for network device communication.

# Community string configuration
rocommunity public 127.0.0.1
rocommunity netdiscovery 10.0.0.0/8
rocommunity netdiscovery 172.16.0.0/12
rocommunity netdiscovery 192.168.0.0/16

System information

sysLocation "Network Operations Center" sysContact "admin@example.com" sysServices 72

Access control

com2sec readonly default netdiscovery group MyROGroup v1 readonly group MyROGroup v2c readonly

OID access restrictions

view all included .1.3.6.1.2.1.1 view all included .1.3.6.1.2.1.2 view all included .1.3.6.1.2.1.4 view all included .1.3.6.1.4.1 access MyROGroup "" any noauth exact all none none

Disable SNMP v1/v2c write access

rwcommunity private 127.0.0.1

Configure LLDP daemon

Enable LLDP on network interfaces to discover directly connected devices and their capabilities.

# Configure LLDP daemon
configure lldp tx-interval 30
configure lldp tx-hold 4
configure system hostname discovery-server
configure system description "Network Discovery Server"
configure system platform "Linux"

Enable CDP compatibility for Cisco devices

configure lldp custom-tlv oui 00,00,0c subtype 1

Interface configuration

configure ports eth0 lldp portidsubtype local eth0 configure med fast-start enable

Enable and start services

Start the SNMP and LLDP daemons and enable them to start automatically on boot.

sudo systemctl enable --now snmpd
sudo systemctl enable --now lldpd
sudo systemctl status snmpd lldpd

Create Python virtual environment

Set up an isolated Python environment for the network discovery scripts and required libraries.

sudo mkdir -p /opt/network-discovery
sudo chown $(whoami):$(whoami) /opt/network-discovery
cd /opt/network-discovery
python3 -m venv venv
source venv/bin/activate
pip install pysnmp pysnmp-mibs netaddr netifaces graphviz networkx matplotlib

Create network scanner script

Build the core network discovery script that performs SNMP walks and LLDP neighbor detection across your network ranges.

#!/usr/bin/env python3
import json
import subprocess
import socket
from ipaddress import IPv4Network, AddressValueError
from pysnmp.hlapi import *
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class NetworkScanner:
    def __init__(self, community='public', timeout=5, retries=1):
        self.community = community
        self.timeout = timeout
        self.retries = retries
        self.discovered_devices = {}
        
    def snmp_get(self, target, oid):
        """Perform SNMP GET operation"""
        try:
            for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(
                SnmpEngine(),
                CommunityData(self.community),
                UdpTransportTarget((target, 161), timeout=self.timeout, retries=self.retries),
                ContextData(),
                ObjectType(ObjectIdentity(oid)),
                lexicographicMode=False,
                maxRows=50):
                
                if errorIndication:
                    break
                elif errorStatus:
                    break
                else:
                    return [str(varBind[1]) for varBind in varBinds]
            return []
        except Exception as e:
            logger.debug(f"SNMP error for {target}: {e}")
            return []
    
    def get_device_info(self, ip):
        """Extract device information via SNMP"""
        device_info = {
            'ip': ip,
            'hostname': '',
            'system_desc': '',
            'uptime': '',
            'interfaces': [],
            'neighbors': [],
            'vendor': '',
            'model': ''
        }
        
        # System information OIDs
        hostname_result = self.snmp_get(ip, '1.3.6.1.2.1.1.5.0')  # sysName
        if hostname_result:
            device_info['hostname'] = hostname_result[0]
        
        desc_result = self.snmp_get(ip, '1.3.6.1.2.1.1.1.0')  # sysDescr
        if desc_result:
            device_info['system_desc'] = desc_result[0]
            # Parse vendor info from description
            desc_lower = desc_result[0].lower()
            if 'cisco' in desc_lower:
                device_info['vendor'] = 'Cisco'
            elif 'juniper' in desc_lower:
                device_info['vendor'] = 'Juniper'
            elif 'hp' in desc_lower or 'hewlett' in desc_lower:
                device_info['vendor'] = 'HP'
            elif 'dell' in desc_lower:
                device_info['vendor'] = 'Dell'
        
        uptime_result = self.snmp_get(ip, '1.3.6.1.2.1.1.3.0')  # sysUpTime
        if uptime_result:
            device_info['uptime'] = uptime_result[0]
        
        # Interface information
        interfaces = self.snmp_get(ip, '1.3.6.1.2.1.2.2.1.2')  # ifDescr
        if interfaces:
            device_info['interfaces'] = interfaces[:10]  # Limit to first 10
        
        # LLDP neighbors (if supported)
        lldp_neighbors = self.snmp_get(ip, '1.0.8802.1.1.2.1.4.1.1.9')  # lldpRemSysName
        if lldp_neighbors:
            device_info['neighbors'] = lldp_neighbors
        
        return device_info
    
    def scan_network_range(self, network_range, max_workers=50):
        """Scan a network range for SNMP-enabled devices"""
        logger.info(f"Scanning network range: {network_range}")
        
        try:
            network = IPv4Network(network_range, strict=False)
        except AddressValueError:
            logger.error(f"Invalid network range: {network_range}")
            return
        
        # Skip network and broadcast addresses for /24 and smaller
        if network.prefixlen >= 24:
            hosts = list(network.hosts())
        else:
            hosts = list(network)
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_ip = {executor.submit(self.check_snmp_device, str(ip)): ip for ip in hosts}
            
            for future in as_completed(future_to_ip, timeout=300):
                ip = future_to_ip[future]
                try:
                    result = future.result()
                    if result:
                        self.discovered_devices[str(ip)] = result
                        logger.info(f"Discovered device: {result['hostname']} ({ip})")
                except Exception as e:
                    logger.debug(f"Error scanning {ip}: {e}")
    
    def check_snmp_device(self, ip):
        """Check if device responds to SNMP and gather info"""
        # Quick port check first
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            sock.settimeout(2)
            sock.connect((ip, 161))
            sock.close()
        except:
            return None
        
        # Try SNMP
        device_info = self.get_device_info(ip)
        if device_info['hostname'] or device_info['system_desc']:
            return device_info
        return None
    
    def get_local_lldp_neighbors(self):
        """Get LLDP neighbors from local daemon"""
        try:
            result = subprocess.run(['lldpctl', '-f', 'json'], 
                                  capture_output=True, text=True, timeout=10)
            if result.returncode == 0:
                return json.loads(result.stdout)
        except Exception as e:
            logger.error(f"Error getting LLDP neighbors: {e}")
        return {}
    
    def generate_topology_map(self):
        """Generate network topology relationships"""
        topology = {
            'devices': self.discovered_devices,
            'connections': [],
            'generated_at': time.strftime('%Y-%m-%d %H:%M:%S')
        }
        
        # Add LLDP-discovered connections
        for device_ip, device_info in self.discovered_devices.items():
            for neighbor in device_info.get('neighbors', []):
                # Find neighbor device by hostname
                for neighbor_ip, neighbor_info in self.discovered_devices.items():
                    if neighbor_info.get('hostname', '').lower() == neighbor.lower():
                        connection = {
                            'source': device_ip,
                            'target': neighbor_ip,
                            'type': 'lldp',
                            'source_name': device_info.get('hostname', device_ip),
                            'target_name': neighbor_info.get('hostname', neighbor_ip)
                        }
                        if connection not in topology['connections']:
                            topology['connections'].append(connection)
        
        return topology
    
    def save_results(self, filename='network_topology.json'):
        """Save discovery results to JSON file"""
        topology = self.generate_topology_map()
        with open(filename, 'w') as f:
            json.dump(topology, f, indent=2)
        logger.info(f"Results saved to {filename}")
        return topology

if __name__ == "__main__":
    scanner = NetworkScanner(community='netdiscovery')
    
    # Define networks to scan
    networks = [
        '192.168.1.0/24',
        '10.0.1.0/24'
    ]
    
    for network in networks:
        scanner.scan_network_range(network)
    
    # Save results
    topology = scanner.save_results('/opt/network-discovery/topology.json')
    print(f"Discovered {len(topology['devices'])} devices")
    print(f"Found {len(topology['connections'])} connections")

Create topology visualizer

Build a script that creates visual network diagrams from the discovered topology data using NetworkX and Matplotlib.

#!/usr/bin/env python3
import json
import networkx as nx
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from datetime import datetime
import argparse

class TopologyVisualizer:
    def __init__(self):
        self.graph = nx.Graph()
        self.vendor_colors = {
            'Cisco': '#1BA0D7',
            'Juniper': '#84BD00', 
            'HP': '#0096D6',
            'Dell': '#007DB8',
            'Unknown': '#666666'
        }
    
    def load_topology(self, filename):
        """Load topology data from JSON file"""
        try:
            with open(filename, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            print(f"Error: Topology file {filename} not found")
            return None
        except json.JSONDecodeError:
            print(f"Error: Invalid JSON in {filename}")
            return None
    
    def build_graph(self, topology_data):
        """Build NetworkX graph from topology data"""
        devices = topology_data.get('devices', {})
        connections = topology_data.get('connections', [])
        
        # Add nodes
        for ip, device_info in devices.items():
            hostname = device_info.get('hostname', ip)
            vendor = device_info.get('vendor', 'Unknown')
            interfaces = len(device_info.get('interfaces', []))
            
            self.graph.add_node(ip, 
                              hostname=hostname,
                              vendor=vendor,
                              interfaces=interfaces,
                              system_desc=device_info.get('system_desc', '')[:50])
        
        # Add edges
        for connection in connections:
            source = connection['source']
            target = connection['target']
            if source in self.graph and target in self.graph:
                self.graph.add_edge(source, target, 
                                  connection_type=connection.get('type', 'unknown'))
    
    def create_visualization(self, output_file='network_topology.png', layout='spring'):
        """Create network topology visualization"""
        if len(self.graph.nodes()) == 0:
            print("No devices to visualize")
            return
        
        # Set up the plot
        plt.figure(figsize=(16, 12))
        ax = plt.gca()
        
        # Choose layout algorithm
        if layout == 'spring':
            pos = nx.spring_layout(self.graph, k=3, iterations=50)
        elif layout == 'circular':
            pos = nx.circular_layout(self.graph)
        elif layout == 'kamada_kawai':
            pos = nx.kamada_kawai_layout(self.graph)
        else:
            pos = nx.spring_layout(self.graph)
        
        # Draw edges
        nx.draw_networkx_edges(self.graph, pos, 
                              edge_color='#CCCCCC', 
                              width=2, 
                              alpha=0.7)
        
        # Draw nodes by vendor
        for vendor, color in self.vendor_colors.items():
            vendor_nodes = [n for n, d in self.graph.nodes(data=True) 
                           if d.get('vendor', 'Unknown') == vendor]
            if vendor_nodes:
                nx.draw_networkx_nodes(self.graph, pos,
                                     nodelist=vendor_nodes,
                                     node_color=color,
                                     node_size=800,
                                     alpha=0.8)
        
        # Add labels
        labels = {}
        for node, data in self.graph.nodes(data=True):
            hostname = data.get('hostname', node)
            if len(hostname) > 15:
                hostname = hostname[:12] + '...'
            labels[node] = f"{hostname}\n({node})"
        
        nx.draw_networkx_labels(self.graph, pos, labels, font_size=8, font_weight='bold')
        
        # Create legend
        legend_elements = []
        for vendor, color in self.vendor_colors.items():
            vendor_nodes = [n for n, d in self.graph.nodes(data=True) 
                           if d.get('vendor', 'Unknown') == vendor]
            if vendor_nodes:
                legend_elements.append(patches.Patch(color=color, label=f'{vendor} ({len(vendor_nodes)})'))
        
        plt.legend(handles=legend_elements, loc='upper left', bbox_to_anchor=(0, 1))
        
        # Add title and info
        plt.title(f"Network Topology Discovery\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
                 f"Devices: {len(self.graph.nodes())} | Connections: {len(self.graph.edges())}", 
                 fontsize=14, pad=20)
        
        # Remove axis
        ax.set_axis_off()
        
        # Save the plot
        plt.tight_layout()
        plt.savefig(output_file, dpi=300, bbox_inches='tight', 
                   facecolor='white', edgecolor='none')
        plt.close()
        
        print(f"Network topology visualization saved to {output_file}")
    
    def generate_report(self, topology_data, output_file='network_report.txt'):
        """Generate text report of discovered network"""
        devices = topology_data.get('devices', {})
        connections = topology_data.get('connections', [])
        
        report = []
        report.append("NETWORK TOPOLOGY DISCOVERY REPORT")
        report.append("=" * 50)
        report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"Total Devices: {len(devices)}")
        report.append(f"Total Connections: {len(connections)}")
        report.append("")
        
        # Device summary by vendor
        vendor_count = {}
        for device_info in devices.values():
            vendor = device_info.get('vendor', 'Unknown')
            vendor_count[vendor] = vendor_count.get(vendor, 0) + 1
        
        report.append("DEVICES BY VENDOR:")
        report.append("-" * 20)
        for vendor, count in sorted(vendor_count.items()):
            report.append(f"{vendor}: {count}")
        report.append("")
        
        # Detailed device list
        report.append("DEVICE DETAILS:")
        report.append("-" * 15)
        for ip, device_info in sorted(devices.items()):
            hostname = device_info.get('hostname', 'Unknown')
            vendor = device_info.get('vendor', 'Unknown')
            interfaces = len(device_info.get('interfaces', []))
            neighbors = len(device_info.get('neighbors', []))
            
            report.append(f"IP: {ip}")
            report.append(f"  Hostname: {hostname}")
            report.append(f"  Vendor: {vendor}")
            report.append(f"  Interfaces: {interfaces}")
            report.append(f"  LLDP Neighbors: {neighbors}")
            report.append(f"  Description: {device_info.get('system_desc', 'N/A')[:100]}")
            report.append("")
        
        # Connection details
        if connections:
            report.append("NETWORK CONNECTIONS:")
            report.append("-" * 20)
            for conn in connections:
                report.append(f"{conn.get('source_name', conn['source'])} <-> "
                            f"{conn.get('target_name', conn['target'])} ({conn.get('type', 'unknown')})")
        
        # Write report
        with open(output_file, 'w') as f:
            f.write('\n'.join(report))
        
        print(f"Network report saved to {output_file}")

def main():
    parser = argparse.ArgumentParser(description='Network Topology Visualizer')
    parser.add_argument('-i', '--input', default='topology.json', 
                       help='Input topology JSON file')
    parser.add_argument('-o', '--output', default='network_topology.png', 
                       help='Output visualization file')
    parser.add_argument('-r', '--report', default='network_report.txt', 
                       help='Output report file')
    parser.add_argument('-l', '--layout', choices=['spring', 'circular', 'kamada_kawai'],
                       default='spring', help='Graph layout algorithm')
    
    args = parser.parse_args()
    
    visualizer = TopologyVisualizer()
    topology_data = visualizer.load_topology(args.input)
    
    if topology_data:
        visualizer.build_graph(topology_data)
        visualizer.create_visualization(args.output, args.layout)
        visualizer.generate_report(topology_data, args.report)
    else:
        print("Failed to load topology data")

if __name__ == "__main__":
    main()

Create automated discovery service

Set up a systemd service and timer for automated periodic network discovery.

#!/usr/bin/env python3
import os
import sys
import logging
import yaml
from pathlib import Path

Add the virtual environment to Python path

sys.path.insert(0, '/opt/network-discovery/venv/lib/python3.*/site-packages') from network_scanner import NetworkScanner from topology_visualizer import TopologyVisualizer

Configure logging

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/network-discovery.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class DiscoveryService: def __init__(self, config_file='/opt/network-discovery/config.yaml'): self.config = self.load_config(config_file) self.base_dir = Path('/opt/network-discovery') def load_config(self, config_file): """Load configuration from YAML file""" default_config = { 'networks': ['192.168.1.0/24', '10.0.1.0/24'], 'snmp_community': 'netdiscovery', 'output_dir': '/opt/network-discovery/results', 'max_workers': 50, 'generate_visualization': True, 'generate_report': True } try: with open(config_file, 'r') as f: config = yaml.safe_load(f) # Merge with defaults default_config.update(config) return default_config except FileNotFoundError: logger.warning(f"Config file {config_file} not found, using defaults") return default_config except Exception as e: logger.error(f"Error loading config: {e}") return default_config def run_discovery(self): """Run network discovery process""" logger.info("Starting network topology discovery") # Create output directory output_dir = Path(self.config['output_dir']) output_dir.mkdir(parents=True, exist_ok=True) # Initialize scanner scanner = NetworkScanner( community=self.config['snmp_community'], timeout=5, retries=1 ) # Scan networks for network in self.config['networks']: logger.info(f"Scanning network: {network}") scanner.scan_network_range(network, self.config['max_workers']) # Save results timestamp = time.strftime('%Y%m%d_%H%M%S') topology_file = output_dir / f'topology_{timestamp}.json' topology_data = scanner.save_results(str(topology_file)) # Create symlink to latest latest_link = output_dir / 'topology_latest.json' if latest_link.exists(): latest_link.unlink() latest_link.symlink_to(topology_file.name) logger.info(f"Discovery complete: {len(topology_data['devices'])} devices found") # Generate visualization if enabled if self.config.get('generate_visualization', False): self.generate_visualization(topology_data, output_dir, timestamp) # Generate report if enabled if self.config.get('generate_report', False): self.generate_report(topology_data, output_dir, timestamp) return topology_data def generate_visualization(self, topology_data, output_dir, timestamp): """Generate network visualization""" try: visualizer = TopologyVisualizer() visualizer.build_graph(topology_data) viz_file = output_dir / f'topology_{timestamp}.png' visualizer.create_visualization(str(viz_file)) # Create symlink to latest latest_link = output_dir / 'topology_latest.png' if latest_link.exists(): latest_link.unlink() latest_link.symlink_to(viz_file.name) logger.info(f"Visualization generated: {viz_file}") except Exception as e: logger.error(f"Error generating visualization: {e}") def generate_report(self, topology_data, output_dir, timestamp): """Generate network report""" try: visualizer = TopologyVisualizer() report_file = output_dir / f'report_{timestamp}.txt' visualizer.generate_report(topology_data, str(report_file)) # Create symlink to latest latest_link = output_dir / 'report_latest.txt' if latest_link.exists(): latest_link.unlink() latest_link.symlink_to(report_file.name) logger.info(f"Report generated: {report_file}") except Exception as e: logger.error(f"Error generating report: {e}") if __name__ == "__main__": import time service = DiscoveryService() try: topology_data = service.run_discovery() logger.info("Network discovery service completed successfully") except Exception as e: logger.error(f"Discovery service failed: {e}") sys.exit(1)

Create configuration file

Set up the YAML configuration file with your network ranges and discovery settings.

# Network Discovery Configuration

Networks to scan (CIDR notation)

networks: - "192.168.1.0/24" - "10.0.1.0/24" - "172.16.1.0/24"

SNMP settings

snmp_community: "netdiscovery" snmp_timeout: 5 snmp_retries: 1

Discovery settings

max_workers: 50 scan_interval_hours: 6

Output settings

output_dir: "/opt/network-discovery/results" generate_visualization: true generate_report: true keep_history_days: 30

Integration settings

prometheus_pushgateway: null grafana_webhook: null slack_webhook: null

Device filters (optional)

include_vendors: [] exclude_vendors: [] include_device_types: [] exclude_device_types: []

Set up systemd service

Create a systemd service unit for the network discovery daemon.

[Unit]
Description=Network Topology Discovery Service
After=network-online.target
Wants=network-online.target

[Service]
Type=oneshot
User=root
Group=root
WorkingDirectory=/opt/network-discovery
Environment=PATH=/opt/network-discovery/venv/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
ExecStart=/opt/network-discovery/venv/bin/python /opt/network-discovery/discovery_service.py
StandardOutput=journal
StandardError=journal
SyslogIdentifier=network-discovery

[Install]
WantedBy=multi-user.target

Create systemd timer

Set up a timer to run network discovery automatically every 6 hours.

[Unit]
Description=Network Discovery Timer
Requires=network-discovery.service

[Timer]

Run every 6 hours

OnCalendar=--* 00,06,12,18:00:00

Run 5 minutes after boot if we missed the last scheduled run

OnBootSec=5min

If the system was suspended/off, run immediately when it comes back

Persistent=true [Install] WantedBy=timers.target

Set proper permissions

Configure file ownership and permissions for the discovery service files and directories.

Never use chmod 777. It gives every user on the system full access to your files. Instead, fix ownership with chown and use minimal permissions.
# Set ownership for discovery directory
sudo chown -R root:root /opt/network-discovery

Set executable permissions for scripts

sudo chmod 755 /opt/network-discovery/*.py

Set read/write for config and data files

sudo chmod 644 /opt/network-discovery/config.yaml

Create results directory with proper permissions

sudo mkdir -p /opt/network-discovery/results sudo chmod 755 /opt/network-discovery/results

Set log file permissions

sudo touch /var/log/network-discovery.log sudo chmod 644 /var/log/network-discovery.log

Enable and start the service

Enable the systemd timer and run an initial discovery to test the setup.

# Reload systemd daemon
sudo systemctl daemon-reload

Enable and start the timer

sudo systemctl enable --now network-discovery.timer

Check timer status

sudo systemctl status network-discovery.timer

Run initial discovery manually

sudo systemctl start network-discovery.service

Check service status

sudo systemctl status network-discovery.service

Configure firewall rules

Open necessary ports for SNMP and LLDP communication in your firewall.

# Allow SNMP (UDP 161) for discovery queries
sudo ufw allow 161/udp comment 'SNMP discovery'

Allow LLDP multicast traffic

sudo ufw allow from any to 224.0.0.0/8

Check firewall status

sudo ufw status numbered
# Allow SNMP (UDP 161) for discovery queries
sudo firewall-cmd --permanent --add-port=161/udp
sudo firewall-cmd --permanent --add-rich-rule='rule family="ipv4" destination address="224.0.0.0/8" accept'

Reload firewall

sudo firewall-cmd --reload

Check firewall status

sudo firewall-cmd --list-all

Integration with monitoring systems

Create Prometheus metrics exporter

Build a Prometheus exporter to make discovery metrics available for monitoring dashboards. This integrates with existing Prometheus and Grafana monitoring setups.

#!/usr/bin/env python3
import json
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
import threading
import logging

logger = logging.getLogger(__name__)

class NetworkDiscoveryExporter:
    def __init__(self, topology_file='/opt/network-discovery/results/topology_latest.json'):
        self.topology_file = topology_file
        self.metrics = {}
        self.last_update = 0
    
    def load_topology_data(self):
        """Load latest topology data"""
        try:
            topology_path = Path(self.topology_file)
            if not topology_path.exists():
                return {}
            
            # Check if file was updated
            file_mtime = topology_path.stat().st_mtime
            if file_mtime <= self.last_update:
                return self.metrics
            
            with open(topology_path, 'r') as f:
                topology_data = json.load(f)
            
            self.last_update = file_mtime
            self.generate_metrics(topology_data)
            return self.metrics
        
        except Exception as e:
            logger.error(f"Error loading topology data: {e}")
            return {}
    
    def generate_metrics(self, topology_data):
        """Generate Prometheus metrics from topology data"""
        devices = topology_data.get('devices', {})
        connections = topology_data.get('connections', [])
        
        # Count devices by vendor
        vendor_counts = {}
        interface_counts = {}
        
        for ip, device_info in devices.items():
            vendor = device_info.get('vendor', 'Unknown')
            vendor_counts[vendor] = vendor_counts.get(vendor, 0) + 1
            
            # Interface count per device
            interface_count = len(device_info.get('interfaces', []))
            interface_counts[ip] = interface_count
        
        # Build metrics
        self.metrics = {
            'network_discovery_devices_total': len(devices),
            'network_discovery_connections_total': len(connections),
            'network_discovery_vendors': vendor_counts,
            'network_discovery_interfaces': interface_counts,
            'network_discovery_last_scan': int(time.time())
        }
    
    def format_prometheus_metrics(self):
        """Format metrics for Prometheus"""
        self.load_topology_data()
        
        output = []
        
        # Total devices
        output.append('# HELP network_discovery_devices_total Total number of discovered devices')
        output.append('# TYPE network_discovery_devices_total gauge')
        output.append(f'network_discovery_devices_total {self.metrics.get("network_discovery_devices_total", 0)}')
        output.append('')
        
        # Total connections
        output.append('# HELP network_discovery_connections_total Total number of network connections')
        output.append('# TYPE network_discovery_connections_total gauge')
        output.append(f'network_discovery_connections_total {self.metrics.get("network_discovery_connections_total", 0)}')
        output.append('')
        
        # Devices by vendor
        output.append('# HELP network_discovery_devices_by_vendor Devices grouped by vendor')
        output.append('# TYPE network_discovery_devices_by_vendor gauge')
        for vendor, count in self.metrics.get('network_discovery_vendors', {}).items():
            output.append(f'network_discovery_devices_by_vendor{{vendor="{vendor}"}} {count}')
        output.append('')
        
        # Last scan timestamp
        output.append('# HELP network_discovery_last_scan_timestamp Unix timestamp of last scan')
        output.append('# TYPE network_discovery_last_scan_timestamp gauge')
        output.append(f'network_discovery_last_scan_timestamp {self.metrics.get("network_discovery_last_scan", 0)}')
        output.append('')
        
        return '\n'.join(output)

class PrometheusHandler(BaseHTTPRequestHandler):
    def __init__(self, *args, exporter=None, **kwargs):
        self.exporter = exporter
        super().__init__(*args, **kwargs)
    
    def do_GET(self):
        if self.path == '/metrics':
            self.send_response(200)
            self.send_header('Content-Type', 'text/plain; charset=utf-8')
            self.end_headers()
            
            metrics = self.exporter.format_prometheus_metrics()
            self.wfile.write(metrics.encode('utf-8'))
        else:
            self.send_response(404)
            self.end_headers()
    
    def log_message(self, format, *args):
        # Suppress default HTTP logging
        pass

def run_prometheus_exporter(port=9090):
    """Run Prometheus metrics exporter"""
    exporter = NetworkDiscoveryExporter()
    
    def handler(*args, **kwargs):
        PrometheusHandler(*args, exporter=exporter, **kwargs)
    
    server = HTTPServer(('0.0.0.0', port), handler)
    logger.info(f"Network Discovery Prometheus exporter running on port {port}")
    server.serve_forever()

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    run_prometheus_exporter()

Create Grafana dashboard configuration

Set up a JSON dashboard configuration that can be imported into Grafana for network topology visualization.

{
  "dashboard": {
    "id": null,
    "title": "Network Topology Discovery",
    "tags": ["network", "topology", "snmp", "lldp"],
    "timezone": "browser",
    "panels": [
      {
        "id": 1,
        "title": "Total Discovered Devices",
        "type": "stat",
        "targets": [
          {
            "expr": "network_discovery_devices_total",
            "refId": "A"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "color": {"mode": "palette-classic"},
            "unit": "short"
          }
        },
        "gridPos": {"h": 8, "w": 6, "x": 0, "y": 0}
      },
      {
        "id": 2,
        "title": "Network Connections",
        "type": "stat",
        "targets": [
          {
            "expr": "network_discovery_connections_total",
            "refId": "A"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "color": {"mode": "palette-classic"},
            "unit": "short"
          }
        },
        "gridPos": {"h": 8, "w": 6, "x": 6, "y": 0}
      },
      {
        "id": 3,
        "title": "Devices by Vendor",
        "type": "piechart",
        "targets": [
          {
            "expr": "network_discovery_devices_by_vendor",
            "refId": "A"
          }
        ],
        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
      },
      {
        "id": 4,
        "title": "Discovery Timeline",
        "type": "timeseries",
        "targets": [
          {
            "expr": "network_discovery_devices_total",
            "refId": "A",
            "legendFormat": "Total Devices"
          },
          {
            "expr": "network_discovery_connections_total",
            "refId": "B", 
            "legendFormat": "Connections"
          }
        ],
        "gridPos": {"h": 8, "w": 24, "x": 0, "y": 8}
      }
    ],
    "time": {
      "from": "now-24h",
      "to": "now"
    },
    "refresh": "30s"
  }
}

Verify your setup

# Check service status
sudo systemctl status network-discovery.timer
sudo systemctl status network-discovery.service

View recent discovery logs

sudo journalctl -u network-discovery.service -n 50

Check SNMP and LLDP services

sudo systemctl status snmpd lldpd

Test SNMP connectivity to a device

snmpwalk -v2c -c netdiscovery 192.168.1.1 1.3.6.1.2.1.1.5

Check LLDP neighbors

lldpctl

View discovery results

ls -la /opt/network-discovery/results/ cat /opt/network-discovery/results/report_latest.txt

Test Python dependencies

cd /opt/network-discovery source venv/bin/activate python -c "import pysnmp, networkx, matplotlib; print('All dependencies OK')"

Check firewall rules

sudo ufw status # Ubuntu/Debian sudo firewall-cmd --list-all # AlmaLinux/Rocky

You can also access the example monitoring integration by linking this setup with SNMP monitoring using Grafana dashboards for enhanced visualization capabilities.

Common issues

SymptomCauseFix
No devices discovered SNMP community mismatch or firewall blocking Verify SNMP community string and check sudo ufw status for firewall rules
Permission denied errors Incorrect file ownership or permissions Run sudo chown -R root:root /opt/network-discovery and sudo chmod 755 /opt/network-discovery/*.py
Python import errors Virtual environment not activated or missing dependencies Activate venv with source /opt/network-discovery/venv/bin/activate and reinstall packages
LLDP neighbors not found LLDP not enabled on network devices Enable LLDP on switches and routers, check with lldpctl
Visualization generation fails Missing display backend for matplotlib Install with sudo apt install python3-tk or set MPLBACKEND=Agg environment variable
Service fails to start Configuration file syntax error Check YAML syntax in /opt/network-discovery/config.yaml and view logs with journalctl -u network-discovery.service

Next steps

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle private cloud infrastructure for businesses that depend on uptime. From initial setup to ongoing operations.