Network and Firewall Audit

Network and Firewall Audit

Examining network-level controls ensures SSH services are properly protected by firewalls and network security measures. This audit phase verifies that network defenses complement SSH security configurations.

Perform network security audit:

#!/usr/bin/env python3
# ssh-network-audit.py
# Audit network security for SSH services

import socket
import subprocess
import ipaddress
import nmap
import json
from datetime import datetime

class SSHNetworkAuditor:
    def __init__(self, target_hosts):
        self.target_hosts = target_hosts
        self.findings = []
        self.nm = nmap.PortScanner()
        
    def scan_ssh_ports(self):
        """Scan for SSH services on non-standard ports"""
        print("Scanning for SSH services...")
        
        for host in self.target_hosts:
            try:
                # Scan common SSH ports
                self.nm.scan(host, '22,2222,22222,2022', '-sV')
                
                for port in self.nm[host]['tcp']:
                    if self.nm[host]['tcp'][port]['state'] == 'open':
                        service = self.nm[host]['tcp'][port].get('name', 'unknown')
                        version = self.nm[host]['tcp'][port].get('version', 'unknown')
                        
                        if 'ssh' in service.lower():
                            finding = {
                                'host': host,
                                'port': port,
                                'service': service,
                                'version': version
                            }
                            
                            # Check for vulnerabilities
                            if port != 22:
                                finding['note'] = 'SSH on non-standard port'
                            
                            # Check version
                            if 'OpenSSH' in version:
                                ssh_version = version.split()[1]
                                if self.is_vulnerable_version(ssh_version):
                                    finding['vulnerability'] = 'Outdated SSH version'
                                    
                            self.findings.append(finding)
                            
            except Exception as e:
                print(f"Error scanning {host}: {str(e)}")
                
    def is_vulnerable_version(self, version):
        """Check if SSH version has known vulnerabilities"""
        try:
            major, minor = map(int, version.split('.')[:2])
            
            # OpenSSH versions before 7.4 have various vulnerabilities
            if major < 7 or (major == 7 and minor < 4):
                return True
                
        except:
            pass
            
        return False
        
    def audit_firewall_rules(self):
        """Audit firewall rules for SSH"""
        print("Auditing firewall rules...")
        
        firewall_findings = []
        
        # Check iptables rules
        try:
            # Get INPUT chain rules
            result = subprocess.run(['iptables', '-L', 'INPUT', '-n', '-v'],
                                  capture_output=True, text=True)
            
            if result.returncode == 0:
                rules = result.stdout.splitlines()
                ssh_rules = [r for r in rules if '22' in r or 'ssh' in r.lower()]
                
                # Analyze SSH rules
                for rule in ssh_rules:
                    if 'ACCEPT' in rule and '0.0.0.0/0' in rule:
                        firewall_findings.append({
                            'type': 'overly_permissive',
                            'rule': rule.strip(),
                            'severity': 'HIGH'
                        })
                        
                # Check for rate limiting
                rate_limit_found = any('limit' in r.lower() for r in ssh_rules)
                if not rate_limit_found:
                    firewall_findings.append({
                        'type': 'no_rate_limiting',
                        'severity': 'MEDIUM'
                    })
                    
        except Exception as e:
            firewall_findings.append({
                'type': 'error',
                'message': f'Failed to audit iptables: {str(e)}'
            })
            
        return firewall_findings
        
    def test_connection_limits(self, host, port=22):
        """Test connection rate limits"""
        print(f"Testing connection limits for {host}:{port}...")
        
        connections = []
        max_connections = 20
        
        try:
            for i in range(max_connections):
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(2)
                
                result = sock.connect_ex((host, port))
                if result == 0:
                    connections.append(sock)
                else:
                    return {
                        'host': host,
                        'max_successful': i,
                        'limit_enforced': True
                    }
                    
            # If we got here, no limit was hit
            for sock in connections:
                sock.close()
                
            return {
                'host': host,
                'max_successful': max_connections,
                'limit_enforced': False,
                'severity': 'HIGH'
            }
            
        except Exception as e:
            return {
                'host': host,
                'error': str(e)
            }
            
    def check_fail2ban_status(self):
        """Check if Fail2ban is protecting SSH"""
        print("Checking Fail2ban status...")
        
        try:
            # Check if fail2ban is running
            result = subprocess.run(['systemctl', 'is-active', 'fail2ban'],
                                  capture_output=True, text=True)
            
            if result.stdout.strip() != 'active':
                return {'status': 'inactive', 'severity': 'HIGH'}
                
            # Check SSH jail status
            result = subprocess.run(['fail2ban-client', 'status', 'sshd'],
                                  capture_output=True, text=True)
            
            if result.returncode == 0:
                # Parse output
                status = {}
                for line in result.stdout.splitlines():
                    if 'Currently failed:' in line:
                        status['failed_ips'] = int(line.split(':')[1].strip())
                    elif 'Total banned:' in line:
                        status['total_banned'] = int(line.split(':')[1].strip())
                        
                return {'status': 'active', 'jail_status': status}
            else:
                return {'status': 'active', 'jail_status': 'unknown'}
                
        except Exception as e:
            return {'status': 'error', 'message': str(e)}
            
    def audit_network_exposure(self):
        """Check SSH exposure to internet"""
        print("Checking network exposure...")
        
        exposure_findings = []
        
        try:
            # Get default route to determine external interface
            result = subprocess.run(['ip', 'route', 'show', 'default'],
                                  capture_output=True, text=True)
            
            if result.returncode == 0:
                external_interface = result.stdout.split()[4]
                
                # Get IP address of external interface
                result = subprocess.run(['ip', 'addr', 'show', external_interface],
                                      capture_output=True, text=True)
                
                # Check if SSH is listening on external interface
                netstat_result = subprocess.run(['netstat', '-tlnp'],
                                              capture_output=True, text=True)
                
                if '0.0.0.0:22' in netstat_result.stdout:
                    exposure_findings.append({
                        'type': 'listening_all_interfaces',
                        'severity': 'MEDIUM',
                        'recommendation': 'Bind SSH to specific interfaces only'
                    })
                    
        except Exception as e:
            exposure_findings.append({
                'type': 'error',
                'message': f'Failed to check exposure: {str(e)}'
            })
            
        return exposure_findings
        
    def generate_network_report(self):
        """Generate network audit report"""
        report = {
            'audit_date': datetime.now().isoformat(),
            'ssh_services': self.findings,
            'firewall_audit': self.audit_firewall_rules(),
            'fail2ban_status': self.check_fail2ban_status(),
            'network_exposure': self.audit_network_exposure()
        }
        
        # Test connection limits for each host
        report['connection_limits'] = []
        for host in self.target_hosts:
            limit_test = self.test_connection_limits(host)
            report['connection_limits'].append(limit_test)
            
        return report

# Example usage
if __name__ == '__main__':
    # Define target hosts
    targets = ['192.168.1.10', '192.168.1.20', '192.168.1.30']
    
    auditor = SSHNetworkAuditor(targets)
    auditor.scan_ssh_ports()
    
    report = auditor.generate_network_report()
    
    with open('ssh_network_audit.json', 'w') as f:
        json.dump(report, f, indent=2)
        
    print("\nNetwork audit complete. Report saved to ssh_network_audit.json")