Automated Incident Response

Automated Incident Response

Create automated response mechanisms:

#!/usr/bin/env python3
# /usr/local/bin/security-response.py

import json
import subprocess
import smtplib
import logging
from datetime import datetime, timedelta
from collections import defaultdict
import redis
import requests

class SecurityIncidentResponder:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, db=0)
        self.threshold_config = {
            'sql_injection': {'count': 3, 'window': 300, 'action': 'block'},
            'xss': {'count': 5, 'window': 300, 'action': 'block'},
            'scanner': {'count': 2, 'window': 60, 'action': 'block'},
            'brute_force': {'count': 10, 'window': 600, 'action': 'block'},
            '404_scan': {'count': 50, 'window': 300, 'action': 'throttle'}
        }
        
    def process_security_event(self, event):
        """Process security event and determine response"""
        ip = event.get('clientip')
        attack_type = event.get('attack_type')
        
        if not ip or not attack_type:
            return
        
        # Track event
        self.track_event(ip, attack_type)
        
        # Check thresholds
        if self.check_threshold(ip, attack_type):
            self.respond_to_threat(ip, attack_type, event)
    
    def track_event(self, ip, attack_type):
        """Track security events in Redis"""
        key = f"security:{attack_type}:{ip}"
        pipe = self.redis.pipeline()
        pipe.incr(key)
        pipe.expire(key, 3600)  # 1 hour expiry
        pipe.execute()
    
    def check_threshold(self, ip, attack_type):
        """Check if threshold is exceeded"""
        if attack_type not in self.threshold_config:
            return False
        
        config = self.threshold_config[attack_type]
        key = f"security:{attack_type}:{ip}"
        count = self.redis.get(key)
        
        if count and int(count) >= config['count']:
            return True
        return False
    
    def respond_to_threat(self, ip, attack_type, event):
        """Execute response action"""
        config = self.threshold_config[attack_type]
        action = config['action']
        
        if action == 'block':
            self.block_ip(ip, attack_type)
        elif action == 'throttle':
            self.throttle_ip(ip)
        
        # Log incident
        self.log_incident(ip, attack_type, action, event)
        
        # Send alert
        self.send_alert(ip, attack_type, action, event)
    
    def block_ip(self, ip, reason):
        """Block IP address"""
        # Add to firewall
        subprocess.run(['sudo', 'ufw', 'insert', '1', 'deny', 'from', ip])
        
        # Add to web server deny list
        with open('/etc/nginx/blocked_ips.conf', 'a') as f:
            f.write(f"deny {ip}; # {reason} - {datetime.now()}\n")
        
        # Reload nginx
        subprocess.run(['sudo', 'nginx', '-s', 'reload'])
        
        # Add to Fail2ban
        subprocess.run(['sudo', 'fail2ban-client', 'set', 'nginx-security', 'banip', ip])
    
    def send_alert(self, ip, attack_type, action, event):
        """Send security alert"""
        message = f"""
Security Incident Alert

Time: {datetime.now()}
IP Address: {ip}
Attack Type: {attack_type}
Action Taken: {action}
Request: {event.get('request', 'N/A')}
User Agent: {event.get('http_user_agent', 'N/A')}

This is an automated security response.
        """
        
        # Send email
        # self.send_email('[email protected]', 'Security Incident', message)
        
        # Send to Slack/webhook
        webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
        requests.post(webhook_url, json={
            'text': f'Security Alert: {attack_type} from {ip} - {action} executed'
        })

if __name__ == '__main__':
    responder = SecurityIncidentResponder()
    
    # Monitor log file or receive events from message queue
    # Example: Process events from log file
    with open('/var/log/security-events.json', 'r') as f:
        for line in f:
            try:
                event = json.loads(line)
                responder.process_security_event(event)
            except json.JSONDecodeError:
                continue

Comprehensive log monitoring and intrusion detection provide critical visibility into your web server's security posture. Regular review and tuning of detection rules ensure they remain effective against evolving threats. The next chapter will explore automated security updates and patch management to maintain your server's security over time.## Automated Security Updates and Patch Management

Maintaining a secure web server requires constant vigilance against newly discovered vulnerabilities. Automated security updates and effective patch management form the backbone of a proactive security strategy, ensuring your Apache or Nginx server remains protected against emerging threats. This chapter provides comprehensive guidance on implementing automated update systems, managing patches across different environments, and maintaining stability while prioritizing security. We'll explore strategies that balance the need for timely updates with the requirement for service reliability.