Security Incident Response Procedures
Security Incident Response Procedures
Implement structured incident response:
#!/usr/bin/env python3
# /usr/local/bin/incident-response.py
import os
import sys
import json
import subprocess
import datetime
import hashlib
import shutil
from pathlib import Path
class IncidentResponder:
def __init__(self):
self.incident_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self.incident_dir = f"/var/incident-response/{self.incident_id}"
self.evidence_dir = f"{self.incident_dir}/evidence"
self.timeline = []
# Create incident directories
Path(self.evidence_dir).mkdir(parents=True, exist_ok=True)
def log_action(self, action, details=""):
"""Log all incident response actions"""
timestamp = datetime.datetime.now().isoformat()
entry = {
"timestamp": timestamp,
"action": action,
"details": details
}
self.timeline.append(entry)
# Write to log file
with open(f"{self.incident_dir}/timeline.json", "w") as f:
json.dump(self.timeline, f, indent=2)
print(f"[{timestamp}] {action}")
if details:
print(f" Details: {details}")
def isolate_system(self):
"""Isolate the system to prevent further damage"""
self.log_action("System Isolation Started")
# Backup current firewall rules
subprocess.run([
"iptables-save",
"-f", f"{self.evidence_dir}/iptables-backup.rules"
])
# Block all incoming connections except SSH from specific IP
commands = [
# Flush existing rules
["iptables", "-F"],
["iptables", "-X"],
# Default policies
["iptables", "-P", "INPUT", "DROP"],
["iptables", "-P", "FORWARD", "DROP"],
["iptables", "-P", "OUTPUT", "ACCEPT"],
# Allow established connections
["iptables", "-A", "INPUT", "-m", "state", "--state", "ESTABLISHED,RELATED", "-j", "ACCEPT"],
# Allow SSH from incident response team only
["iptables", "-A", "INPUT", "-p", "tcp", "--dport", "22", "-s", "10.0.0.100/32", "-j", "ACCEPT"],
# Allow loopback
["iptables", "-A", "INPUT", "-i", "lo", "-j", "ACCEPT"],
# Log dropped packets
["iptables", "-A", "INPUT", "-j", "LOG", "--log-prefix", "INCIDENT_BLOCKED: "],
]
for cmd in commands:
subprocess.run(cmd)
self.log_action("System Isolated", "Firewall rules applied, only SSH from 10.0.0.100 allowed")
def collect_evidence(self):
"""Collect forensic evidence"""
self.log_action("Evidence Collection Started")
# System information
self.log_action("Collecting System Information")
system_info = {
"hostname": subprocess.getoutput("hostname"),
"date": subprocess.getoutput("date"),
"uptime": subprocess.getoutput("uptime"),
"kernel": subprocess.getoutput("uname -a"),
"users": subprocess.getoutput("who"),
"processes": subprocess.getoutput("ps auxf")
}
with open(f"{self.evidence_dir}/system-info.json", "w") as f:
json.dump(system_info, f, indent=2)
# Network connections
self.log_action("Collecting Network Connections")
subprocess.run([
"netstat", "-tulpan"
], stdout=open(f"{self.evidence_dir}/netstat.txt", "w"))
subprocess.run([
"ss", "-tulpan"
], stdout=open(f"{self.evidence_dir}/ss.txt", "w"))
# Running processes
self.log_action("Collecting Process Information")
subprocess.run([
"ps", "auxf"
], stdout=open(f"{self.evidence_dir}/processes.txt", "w"))
subprocess.run([
"lsof", "-n"
], stdout=open(f"{self.evidence_dir}/open-files.txt", "w"))
# Memory dump (if available)
if os.path.exists("/proc/kcore"):
self.log_action("Creating Memory Sample")
# Note: Full memory dump requires special tools
subprocess.run([
"strings", "/proc/kcore"
], stdout=open(f"{self.evidence_dir}/memory-strings.txt", "w"))
# File system timeline
self.log_action("Creating File System Timeline")
subprocess.run([
"find", "/", "-type", "f", "-mtime", "-7",
"-ls"
], stdout=open(f"{self.evidence_dir}/recent-files.txt", "w"),
stderr=subprocess.DEVNULL)
# Web server logs
self.log_action("Collecting Web Server Logs")
log_dirs = [
"/var/log/apache2",
"/var/log/nginx",
"/var/log/auth.log",
"/var/log/syslog"
]
for log_dir in log_dirs:
if os.path.exists(log_dir):
if os.path.isdir(log_dir):
shutil.copytree(
log_dir,
f"{self.evidence_dir}/{os.path.basename(log_dir)}"
)
else:
shutil.copy2(
log_dir,
f"{self.evidence_dir}/{os.path.basename(log_dir)}"
)
self.log_action("Evidence Collection Completed")
def analyze_compromise(self):
"""Analyze the compromise"""
self.log_action("Compromise Analysis Started")
findings = []
# Check for suspicious processes
suspicious_processes = [
"nc", "ncat", "cryptominer", "xmrig",
"backdoor", "reverse", "shell"
]
ps_output = subprocess.getoutput("ps aux")
for proc in suspicious_processes:
if proc in ps_output.lower():
findings.append(f"Suspicious process found: {proc}")
# Check for unauthorized SSH keys
ssh_dirs = ["/root/.ssh", "/home/*/.ssh"]
for pattern in ssh_dirs:
for ssh_dir in subprocess.getoutput(f"ls -d {pattern} 2>/dev/null").split():
if os.path.exists(f"{ssh_dir}/authorized_keys"):
self.log_action(
"Checking SSH Keys",
f"Found authorized_keys in {ssh_dir}"
)
# Check for modified system files
critical_files = [
"/etc/passwd",
"/etc/shadow",
"/etc/sudoers",
"/etc/crontab"
]
for file in critical_files:
if os.path.exists(file):
# Get file hash
with open(file, 'rb') as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
# Check modification time
mtime = datetime.datetime.fromtimestamp(
os.path.getmtime(file)
)
if (datetime.datetime.now() - mtime).days < 7:
findings.append(
f"Recently modified critical file: {file} "
f"(modified: {mtime}, hash: {file_hash})"
)
# Check web server configurations
if os.path.exists("/etc/apache2"):
apache_configs = subprocess.getoutput(
"grep -r 'ProxyPass\\|Include\\|LoadModule' /etc/apache2/"
)
if "proxy" in apache_configs.lower():
findings.append("Proxy configuration found in Apache")
# Save findings
with open(f"{self.incident_dir}/findings.json", "w") as f:
json.dump(findings, f, indent=2)
self.log_action("Compromise Analysis Completed", f"Found {len(findings)} issues")
return findings
def contain_threat(self):
"""Contain the identified threat"""
self.log_action("Threat Containment Started")
# Stop suspicious services
suspicious_services = [
"apache2", "nginx", "mysql", "postgresql"
]
for service in suspicious_services:
result = subprocess.run(
["systemctl", "is-active", service],
capture_output=True,
text=True
)
if result.stdout.strip() == "active":
self.log_action(
f"Stopping Service",
f"Stopping {service} for investigation"
)
subprocess.run(["systemctl", "stop", service])
# Kill suspicious processes
# (In practice, be more selective)
# Disable cron jobs temporarily
subprocess.run(["systemctl", "stop", "cron"])
self.log_action("Cron Disabled", "Cron service stopped")
# Revoke all current sessions
subprocess.run(["pkill", "-KILL", "-u", "www-data"])
self.log_action("Threat Containment Completed")
def create_recovery_plan(self):
"""Create a recovery plan based on findings"""
self.log_action("Creating Recovery Plan")
plan = {
"incident_id": self.incident_id,
"created": datetime.datetime.now().isoformat(),
"steps": [
{
"priority": 1,
"action": "Verify all backups are intact and recent",
"responsible": "Operations Team"
},
{
"priority": 2,
"action": "Rebuild affected systems from known-good images",
"responsible": "Infrastructure Team"
},
{
"priority": 3,
"action": "Restore data from verified backups",
"responsible": "Operations Team"
},
{
"priority": 4,
"action": "Apply all security patches and updates",
"responsible": "Security Team"
},
{
"priority": 5,
"action": "Reset all credentials and certificates",
"responsible": "Security Team"
},
{
"priority": 6,
"action": "Implement additional monitoring",
"responsible": "Security Team"
},
{
"priority": 7,
"action": "Conduct post-incident review",
"responsible": "All Teams"
}
]
}
with open(f"{self.incident_dir}/recovery-plan.json", "w") as f:
json.dump(plan, f, indent=2)
self.log_action("Recovery Plan Created")
return plan
def generate_report(self):
"""Generate incident report"""
self.log_action("Generating Incident Report")
report = f"""
# Security Incident Report