Network and Firewall Audit
Network and Firewall Audit
Examining network-level controls ensures SSH services are properly protected by firewalls and network security measures. This audit phase verifies that network defenses complement SSH security configurations.
Perform network security audit:
#!/usr/bin/env python3
# ssh-network-audit.py
# Audit network security for SSH services
import socket
import subprocess
import ipaddress
import nmap
import json
from datetime import datetime
class SSHNetworkAuditor:
def __init__(self, target_hosts):
self.target_hosts = target_hosts
self.findings = []
self.nm = nmap.PortScanner()
def scan_ssh_ports(self):
"""Scan for SSH services on non-standard ports"""
print("Scanning for SSH services...")
for host in self.target_hosts:
try:
# Scan common SSH ports
self.nm.scan(host, '22,2222,22222,2022', '-sV')
for port in self.nm[host]['tcp']:
if self.nm[host]['tcp'][port]['state'] == 'open':
service = self.nm[host]['tcp'][port].get('name', 'unknown')
version = self.nm[host]['tcp'][port].get('version', 'unknown')
if 'ssh' in service.lower():
finding = {
'host': host,
'port': port,
'service': service,
'version': version
}
# Check for vulnerabilities
if port != 22:
finding['note'] = 'SSH on non-standard port'
# Check version
if 'OpenSSH' in version:
ssh_version = version.split()[1]
if self.is_vulnerable_version(ssh_version):
finding['vulnerability'] = 'Outdated SSH version'
self.findings.append(finding)
except Exception as e:
print(f"Error scanning {host}: {str(e)}")
def is_vulnerable_version(self, version):
"""Check if SSH version has known vulnerabilities"""
try:
major, minor = map(int, version.split('.')[:2])
# OpenSSH versions before 7.4 have various vulnerabilities
if major < 7 or (major == 7 and minor < 4):
return True
except:
pass
return False
def audit_firewall_rules(self):
"""Audit firewall rules for SSH"""
print("Auditing firewall rules...")
firewall_findings = []
# Check iptables rules
try:
# Get INPUT chain rules
result = subprocess.run(['iptables', '-L', 'INPUT', '-n', '-v'],
capture_output=True, text=True)
if result.returncode == 0:
rules = result.stdout.splitlines()
ssh_rules = [r for r in rules if '22' in r or 'ssh' in r.lower()]
# Analyze SSH rules
for rule in ssh_rules:
if 'ACCEPT' in rule and '0.0.0.0/0' in rule:
firewall_findings.append({
'type': 'overly_permissive',
'rule': rule.strip(),
'severity': 'HIGH'
})
# Check for rate limiting
rate_limit_found = any('limit' in r.lower() for r in ssh_rules)
if not rate_limit_found:
firewall_findings.append({
'type': 'no_rate_limiting',
'severity': 'MEDIUM'
})
except Exception as e:
firewall_findings.append({
'type': 'error',
'message': f'Failed to audit iptables: {str(e)}'
})
return firewall_findings
def test_connection_limits(self, host, port=22):
"""Test connection rate limits"""
print(f"Testing connection limits for {host}:{port}...")
connections = []
max_connections = 20
try:
for i in range(max_connections):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((host, port))
if result == 0:
connections.append(sock)
else:
return {
'host': host,
'max_successful': i,
'limit_enforced': True
}
# If we got here, no limit was hit
for sock in connections:
sock.close()
return {
'host': host,
'max_successful': max_connections,
'limit_enforced': False,
'severity': 'HIGH'
}
except Exception as e:
return {
'host': host,
'error': str(e)
}
def check_fail2ban_status(self):
"""Check if Fail2ban is protecting SSH"""
print("Checking Fail2ban status...")
try:
# Check if fail2ban is running
result = subprocess.run(['systemctl', 'is-active', 'fail2ban'],
capture_output=True, text=True)
if result.stdout.strip() != 'active':
return {'status': 'inactive', 'severity': 'HIGH'}
# Check SSH jail status
result = subprocess.run(['fail2ban-client', 'status', 'sshd'],
capture_output=True, text=True)
if result.returncode == 0:
# Parse output
status = {}
for line in result.stdout.splitlines():
if 'Currently failed:' in line:
status['failed_ips'] = int(line.split(':')[1].strip())
elif 'Total banned:' in line:
status['total_banned'] = int(line.split(':')[1].strip())
return {'status': 'active', 'jail_status': status}
else:
return {'status': 'active', 'jail_status': 'unknown'}
except Exception as e:
return {'status': 'error', 'message': str(e)}
def audit_network_exposure(self):
"""Check SSH exposure to internet"""
print("Checking network exposure...")
exposure_findings = []
try:
# Get default route to determine external interface
result = subprocess.run(['ip', 'route', 'show', 'default'],
capture_output=True, text=True)
if result.returncode == 0:
external_interface = result.stdout.split()[4]
# Get IP address of external interface
result = subprocess.run(['ip', 'addr', 'show', external_interface],
capture_output=True, text=True)
# Check if SSH is listening on external interface
netstat_result = subprocess.run(['netstat', '-tlnp'],
capture_output=True, text=True)
if '0.0.0.0:22' in netstat_result.stdout:
exposure_findings.append({
'type': 'listening_all_interfaces',
'severity': 'MEDIUM',
'recommendation': 'Bind SSH to specific interfaces only'
})
except Exception as e:
exposure_findings.append({
'type': 'error',
'message': f'Failed to check exposure: {str(e)}'
})
return exposure_findings
def generate_network_report(self):
"""Generate network audit report"""
report = {
'audit_date': datetime.now().isoformat(),
'ssh_services': self.findings,
'firewall_audit': self.audit_firewall_rules(),
'fail2ban_status': self.check_fail2ban_status(),
'network_exposure': self.audit_network_exposure()
}
# Test connection limits for each host
report['connection_limits'] = []
for host in self.target_hosts:
limit_test = self.test_connection_limits(host)
report['connection_limits'].append(limit_test)
return report
# Example usage
if __name__ == '__main__':
# Define target hosts
targets = ['192.168.1.10', '192.168.1.20', '192.168.1.30']
auditor = SSHNetworkAuditor(targets)
auditor.scan_ssh_ports()
report = auditor.generate_network_report()
with open('ssh_network_audit.json', 'w') as f:
json.dump(report, f, indent=2)
print("\nNetwork audit complete. Report saved to ssh_network_audit.json")