Automated Incident Response
Automated Incident Response
Create automated response mechanisms:
#!/usr/bin/env python3
# /usr/local/bin/security-response.py
import json
import subprocess
import smtplib
import logging
from datetime import datetime, timedelta
from collections import defaultdict
import redis
import requests
class SecurityIncidentResponder:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, db=0)
self.threshold_config = {
'sql_injection': {'count': 3, 'window': 300, 'action': 'block'},
'xss': {'count': 5, 'window': 300, 'action': 'block'},
'scanner': {'count': 2, 'window': 60, 'action': 'block'},
'brute_force': {'count': 10, 'window': 600, 'action': 'block'},
'404_scan': {'count': 50, 'window': 300, 'action': 'throttle'}
}
def process_security_event(self, event):
"""Process security event and determine response"""
ip = event.get('clientip')
attack_type = event.get('attack_type')
if not ip or not attack_type:
return
# Track event
self.track_event(ip, attack_type)
# Check thresholds
if self.check_threshold(ip, attack_type):
self.respond_to_threat(ip, attack_type, event)
def track_event(self, ip, attack_type):
"""Track security events in Redis"""
key = f"security:{attack_type}:{ip}"
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, 3600) # 1 hour expiry
pipe.execute()
def check_threshold(self, ip, attack_type):
"""Check if threshold is exceeded"""
if attack_type not in self.threshold_config:
return False
config = self.threshold_config[attack_type]
key = f"security:{attack_type}:{ip}"
count = self.redis.get(key)
if count and int(count) >= config['count']:
return True
return False
def respond_to_threat(self, ip, attack_type, event):
"""Execute response action"""
config = self.threshold_config[attack_type]
action = config['action']
if action == 'block':
self.block_ip(ip, attack_type)
elif action == 'throttle':
self.throttle_ip(ip)
# Log incident
self.log_incident(ip, attack_type, action, event)
# Send alert
self.send_alert(ip, attack_type, action, event)
def block_ip(self, ip, reason):
"""Block IP address"""
# Add to firewall
subprocess.run(['sudo', 'ufw', 'insert', '1', 'deny', 'from', ip])
# Add to web server deny list
with open('/etc/nginx/blocked_ips.conf', 'a') as f:
f.write(f"deny {ip}; # {reason} - {datetime.now()}\n")
# Reload nginx
subprocess.run(['sudo', 'nginx', '-s', 'reload'])
# Add to Fail2ban
subprocess.run(['sudo', 'fail2ban-client', 'set', 'nginx-security', 'banip', ip])
def send_alert(self, ip, attack_type, action, event):
"""Send security alert"""
message = f"""
Security Incident Alert
Time: {datetime.now()}
IP Address: {ip}
Attack Type: {attack_type}
Action Taken: {action}
Request: {event.get('request', 'N/A')}
User Agent: {event.get('http_user_agent', 'N/A')}
This is an automated security response.
"""
# Send email
# self.send_email('[email protected]', 'Security Incident', message)
# Send to Slack/webhook
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
requests.post(webhook_url, json={
'text': f'Security Alert: {attack_type} from {ip} - {action} executed'
})
if __name__ == '__main__':
responder = SecurityIncidentResponder()
# Monitor log file or receive events from message queue
# Example: Process events from log file
with open('/var/log/security-events.json', 'r') as f:
for line in f:
try:
event = json.loads(line)
responder.process_security_event(event)
except json.JSONDecodeError:
continue
Comprehensive log monitoring and intrusion detection provide critical visibility into your web server's security posture. Regular review and tuning of detection rules ensure they remain effective against evolving threats. The next chapter will explore automated security updates and patch management to maintain your server's security over time.## Automated Security Updates and Patch Management
Maintaining a secure web server requires constant vigilance against newly discovered vulnerabilities. Automated security updates and effective patch management form the backbone of a proactive security strategy, ensuring your Apache or Nginx server remains protected against emerging threats. This chapter provides comprehensive guidance on implementing automated update systems, managing patches across different environments, and maintaining stability while prioritizing security. We'll explore strategies that balance the need for timely updates with the requirement for service reliability.