Continuous Security Monitoring

Continuous Security Monitoring

Implement real-time security monitoring:

#!/usr/bin/env python3
# /usr/local/bin/continuous-security-monitor.py

import asyncio
import aiohttp
import time
import json
from datetime import datetime
import hashlib

class ContinuousSecurityMonitor:
    def __init__(self, config_file):
        with open(config_file, 'r') as f:
            self.config = json.load(f)
        self.baseline = {}
        self.alerts = []
        
    async def monitor_file_integrity(self):
        """Monitor critical files for unauthorized changes"""
        critical_files = [
            '/etc/apache2/apache2.conf',
            '/etc/nginx/nginx.conf',
            '/etc/passwd',
            '/etc/shadow',
            '/var/www/html/index.html'
        ]
        
        while True:
            for file_path in critical_files:
                try:
                    with open(file_path, 'rb') as f:
                        current_hash = hashlib.sha256(f.read()).hexdigest()
                        
                    if file_path in self.baseline:
                        if self.baseline[file_path] != current_hash:
                            await self.send_alert(
                                'File Integrity Violation',
                                f'File {file_path} has been modified',
                                'HIGH'
                            )
                    else:
                        self.baseline[file_path] = current_hash
                        
                except Exception as e:
                    print(f"Error monitoring {file_path}: {e}")
                    
            await asyncio.sleep(300)  # Check every 5 minutes
            
    async def monitor_configuration_drift(self):
        """Monitor for configuration changes"""
        while True:
            # Check Apache configuration
            apache_test = await self.run_command('apache2ctl -t')
            if 'Syntax OK' not in apache_test:
                await self.send_alert(
                    'Configuration Error',
                    'Apache configuration syntax error detected',
                    'CRITICAL'
                )
                
            # Check Nginx configuration
            nginx_test = await self.run_command('nginx -t')
            if 'test is successful' not in nginx_test:
                await self.send_alert(
                    'Configuration Error',
                    'Nginx configuration syntax error detected',
                    'CRITICAL'
                )
                
            await asyncio.sleep(600)  # Check every 10 minutes
            
    async def monitor_security_headers(self):
        """Monitor security headers on production sites"""
        while True:
            for site in self.config.get('monitored_sites', []):
                try:
                    async with aiohttp.ClientSession() as session:
                        async with session.get(site, ssl=False) as response:
                            headers = response.headers
                            
                            required_headers = [
                                'Strict-Transport-Security',
                                'X-Content-Type-Options',
                                'X-Frame-Options'
                            ]
                            
                            for header in required_headers:
                                if header not in headers:
                                    await self.send_alert(
                                        'Missing Security Header',
                                        f'{header} missing from {site}',
                                        'MEDIUM'
                                    )
                                    
                except Exception as e:
                    await self.send_alert(
                        'Monitoring Error',
                        f'Failed to check {site}: {e}',
                        'LOW'
                    )
                    
            await asyncio.sleep(3600)  # Check hourly
            
    async def monitor_suspicious_activity(self):
        """Monitor logs for suspicious patterns"""
        patterns = [
            (r'(?:union.*select|select.*from)', 'SQL Injection Attempt'),
            (r'<script|javascript:', 'XSS Attempt'),
            (r'\.\./\.\.|\.\.\\', 'Directory Traversal'),
            (r'(cmd|powershell)\.exe', 'Command Execution Attempt')
        ]
        
        while True:
            # Monitor access logs
            # This is a simplified example - production should use inotify
            await asyncio.sleep(60)
            
    async def run_command(self, command):
        """Run shell command asynchronously"""
        proc = await asyncio.create_subprocess_shell(
            command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, stderr = await proc.communicate()
        return stdout.decode() + stderr.decode()
        
    async def send_alert(self, alert_type, message, severity):
        """Send security alert"""
        alert = {
            'timestamp': datetime.now().isoformat(),
            'type': alert_type,
            'message': message,
            'severity': severity,
            'hostname': await self.run_command('hostname')
        }
        
        self.alerts.append(alert)
        
        # Log alert
        with open('/var/log/security-alerts.json', 'a') as f:
            json.dump(alert, f)
            f.write('\n')
            
        # Send notification (implement email/Slack/etc)
        print(f"[{severity}] {alert_type}: {message}")
        
    async def run(self):
        """Run all monitoring tasks concurrently"""
        tasks = [
            self.monitor_file_integrity(),
            self.monitor_configuration_drift(),
            self.monitor_security_headers(),
            self.monitor_suspicious_activity()
        ]
        
        await asyncio.gather(*tasks)

if __name__ == '__main__':
    monitor = ContinuousSecurityMonitor('/etc/security-monitor.json')
    asyncio.run(monitor.run())

Regular security testing and auditing form the foundation of a robust web server security program. By combining automated scanning, manual testing, and continuous monitoring, organizations can identify and remediate vulnerabilities before they're exploited. The next chapter will explore disaster recovery and security incident response procedures.## Disaster Recovery and Security Incident Response

When security incidents occur or disasters strike, the difference between minor disruption and catastrophic loss often comes down to preparation. This chapter provides comprehensive guidance on developing disaster recovery plans for Apache and Nginx servers, establishing incident response procedures, and implementing backup strategies that ensure business continuity. We'll cover everything from automated backup systems to coordinated incident response, helping you build resilience into your web server infrastructure.