Continuous Vulnerability Monitoring

Continuous Vulnerability Monitoring

Implement ongoing vulnerability scanning:

#!/usr/bin/env python3
# /usr/local/bin/vulnerability-scanner.py

import requests
import ssl
import socket
from urllib.parse import urlparse
import subprocess
import json

class VulnerabilityScanner:
    def __init__(self, target_url):
        self.target_url = target_url
        self.vulnerabilities = []
        
    def check_ssl_vulnerabilities(self):
        """Check for SSL/TLS vulnerabilities"""
        parsed = urlparse(self.target_url)
        hostname = parsed.hostname
        port = parsed.port or 443
        
        try:
            # Check for weak ciphers
            context = ssl.create_default_context()
            context.set_ciphers('ALL:@SECLEVEL=0')  # Allow all ciphers for testing
            
            with socket.create_connection((hostname, port), timeout=10) as sock:
                with context.wrap_socket(sock, server_hostname=hostname) as ssock:
                    cipher = ssock.cipher()
                    if 'RC4' in cipher[0] or 'DES' in cipher[0]:
                        self.vulnerabilities.append({
                            'type': 'Weak Cipher',
                            'severity': 'HIGH',
                            'details': f'Weak cipher in use: {cipher[0]}'
                        })
                    
                    # Check protocol version
                    if ssock.version() in ['TLSv1', 'TLSv1.1']:
                        self.vulnerabilities.append({
                            'type': 'Outdated TLS',
                            'severity': 'MEDIUM',
                            'details': f'Outdated TLS version: {ssock.version()}'
                        })
                        
        except Exception as e:
            print(f"SSL check error: {e}")
    
    def check_security_headers(self):
        """Check for missing security headers"""
        try:
            response = requests.get(self.target_url, timeout=10)
            headers = response.headers
            
            required_headers = {
                'Strict-Transport-Security': 'HSTS header missing',
                'X-Content-Type-Options': 'X-Content-Type-Options header missing',
                'X-Frame-Options': 'X-Frame-Options header missing',
                'Content-Security-Policy': 'CSP header missing',
                'X-XSS-Protection': 'XSS protection header missing'
            }
            
            for header, message in required_headers.items():
                if header not in headers:
                    self.vulnerabilities.append({
                        'type': 'Missing Security Header',
                        'severity': 'MEDIUM',
                        'details': message
                    })
                    
            # Check for information disclosure
            if 'Server' in headers and ('Apache' in headers['Server'] or 'nginx' in headers['Server']):
                if '/' in headers['Server']:  # Version disclosed
                    self.vulnerabilities.append({
                        'type': 'Information Disclosure',
                        'severity': 'LOW',
                        'details': f'Server version disclosed: {headers["Server"]}'
                    })
                    
        except Exception as e:
            print(f"Header check error: {e}")
    
    def check_common_vulnerabilities(self):
        """Check for common web vulnerabilities"""
        test_paths = [
            '/.git/HEAD',
            '/.svn/entries',
            '/.env',
            '/wp-config.php',
            '/phpinfo.php',
            '/.htaccess',
            '/.htpasswd',
            '/server-status',
            '/server-info'
        ]
        
        for path in test_paths:
            try:
                url = self.target_url.rstrip('/') + path
                response = requests.get(url, timeout=5)
                
                if response.status_code == 200:
                    self.vulnerabilities.append({
                        'type': 'Sensitive File Exposure',
                        'severity': 'HIGH',
                        'details': f'Sensitive file accessible: {path}'
                    })
                    
            except:
                pass  # Expected for non-existent paths
    
    def generate_report(self):
        """Generate vulnerability report"""
        report = {
            'target': self.target_url,
            'scan_date': subprocess.check_output(['date']).decode().strip(),
            'vulnerabilities_found': len(self.vulnerabilities),
            'vulnerabilities': self.vulnerabilities
        }
        
        # Categorize by severity
        report['summary'] = {
            'critical': len([v for v in self.vulnerabilities if v['severity'] == 'CRITICAL']),
            'high': len([v for v in self.vulnerabilities if v['severity'] == 'HIGH']),
            'medium': len([v for v in self.vulnerabilities if v['severity'] == 'MEDIUM']),
            'low': len([v for v in self.vulnerabilities if v['severity'] == 'LOW'])
        }
        
        return report
    
    def scan(self):
        """Run all vulnerability checks"""
        print(f"Scanning {self.target_url}...")
        self.check_ssl_vulnerabilities()
        self.check_security_headers()
        self.check_common_vulnerabilities()
        return self.generate_report()

if __name__ == '__main__':
    scanner = VulnerabilityScanner('https://example.com')
    report = scanner.scan()
    
    print(json.dumps(report, indent=2))
    
    # Save report
    with open('/var/log/vulnerability-scan.json', 'w') as f:
        json.dump(report, f, indent=2)

Understanding and preventing common vulnerabilities requires continuous vigilance and a multi-layered approach. Regular security audits, automated scanning, and prompt patching form the foundation of a robust security posture. The next chapter will explore security testing and audit procedures to ensure your defenses remain effective.