Security Incident Response Procedures

Security Incident Response Procedures

Implement structured incident response:

#!/usr/bin/env python3
# /usr/local/bin/incident-response.py

import os
import sys
import json
import subprocess
import datetime
import hashlib
import shutil
from pathlib import Path

class IncidentResponder:
    def __init__(self):
        self.incident_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        self.incident_dir = f"/var/incident-response/{self.incident_id}"
        self.evidence_dir = f"{self.incident_dir}/evidence"
        self.timeline = []
        
        # Create incident directories
        Path(self.evidence_dir).mkdir(parents=True, exist_ok=True)
        
    def log_action(self, action, details=""):
        """Log all incident response actions"""
        timestamp = datetime.datetime.now().isoformat()
        entry = {
            "timestamp": timestamp,
            "action": action,
            "details": details
        }
        self.timeline.append(entry)
        
        # Write to log file
        with open(f"{self.incident_dir}/timeline.json", "w") as f:
            json.dump(self.timeline, f, indent=2)
            
        print(f"[{timestamp}] {action}")
        if details:
            print(f"  Details: {details}")
            
    def isolate_system(self):
        """Isolate the system to prevent further damage"""
        self.log_action("System Isolation Started")
        
        # Backup current firewall rules
        subprocess.run([
            "iptables-save", 
            "-f", f"{self.evidence_dir}/iptables-backup.rules"
        ])
        
        # Block all incoming connections except SSH from specific IP
        commands = [
            # Flush existing rules
            ["iptables", "-F"],
            ["iptables", "-X"],
            
            # Default policies
            ["iptables", "-P", "INPUT", "DROP"],
            ["iptables", "-P", "FORWARD", "DROP"],
            ["iptables", "-P", "OUTPUT", "ACCEPT"],
            
            # Allow established connections
            ["iptables", "-A", "INPUT", "-m", "state", "--state", "ESTABLISHED,RELATED", "-j", "ACCEPT"],
            
            # Allow SSH from incident response team only
            ["iptables", "-A", "INPUT", "-p", "tcp", "--dport", "22", "-s", "10.0.0.100/32", "-j", "ACCEPT"],
            
            # Allow loopback
            ["iptables", "-A", "INPUT", "-i", "lo", "-j", "ACCEPT"],
            
            # Log dropped packets
            ["iptables", "-A", "INPUT", "-j", "LOG", "--log-prefix", "INCIDENT_BLOCKED: "],
        ]
        
        for cmd in commands:
            subprocess.run(cmd)
            
        self.log_action("System Isolated", "Firewall rules applied, only SSH from 10.0.0.100 allowed")
        
    def collect_evidence(self):
        """Collect forensic evidence"""
        self.log_action("Evidence Collection Started")
        
        # System information
        self.log_action("Collecting System Information")
        system_info = {
            "hostname": subprocess.getoutput("hostname"),
            "date": subprocess.getoutput("date"),
            "uptime": subprocess.getoutput("uptime"),
            "kernel": subprocess.getoutput("uname -a"),
            "users": subprocess.getoutput("who"),
            "processes": subprocess.getoutput("ps auxf")
        }
        
        with open(f"{self.evidence_dir}/system-info.json", "w") as f:
            json.dump(system_info, f, indent=2)
            
        # Network connections
        self.log_action("Collecting Network Connections")
        subprocess.run([
            "netstat", "-tulpan"
        ], stdout=open(f"{self.evidence_dir}/netstat.txt", "w"))
        
        subprocess.run([
            "ss", "-tulpan"
        ], stdout=open(f"{self.evidence_dir}/ss.txt", "w"))
        
        # Running processes
        self.log_action("Collecting Process Information")
        subprocess.run([
            "ps", "auxf"
        ], stdout=open(f"{self.evidence_dir}/processes.txt", "w"))
        
        subprocess.run([
            "lsof", "-n"
        ], stdout=open(f"{self.evidence_dir}/open-files.txt", "w"))
        
        # Memory dump (if available)
        if os.path.exists("/proc/kcore"):
            self.log_action("Creating Memory Sample")
            # Note: Full memory dump requires special tools
            subprocess.run([
                "strings", "/proc/kcore"
            ], stdout=open(f"{self.evidence_dir}/memory-strings.txt", "w"))
            
        # File system timeline
        self.log_action("Creating File System Timeline")
        subprocess.run([
            "find", "/", "-type", "f", "-mtime", "-7",
            "-ls"
        ], stdout=open(f"{self.evidence_dir}/recent-files.txt", "w"), 
           stderr=subprocess.DEVNULL)
        
        # Web server logs
        self.log_action("Collecting Web Server Logs")
        log_dirs = [
            "/var/log/apache2",
            "/var/log/nginx",
            "/var/log/auth.log",
            "/var/log/syslog"
        ]
        
        for log_dir in log_dirs:
            if os.path.exists(log_dir):
                if os.path.isdir(log_dir):
                    shutil.copytree(
                        log_dir, 
                        f"{self.evidence_dir}/{os.path.basename(log_dir)}"
                    )
                else:
                    shutil.copy2(
                        log_dir,
                        f"{self.evidence_dir}/{os.path.basename(log_dir)}"
                    )
                    
        self.log_action("Evidence Collection Completed")
        
    def analyze_compromise(self):
        """Analyze the compromise"""
        self.log_action("Compromise Analysis Started")
        
        findings = []
        
        # Check for suspicious processes
        suspicious_processes = [
            "nc", "ncat", "cryptominer", "xmrig", 
            "backdoor", "reverse", "shell"
        ]
        
        ps_output = subprocess.getoutput("ps aux")
        for proc in suspicious_processes:
            if proc in ps_output.lower():
                findings.append(f"Suspicious process found: {proc}")
                
        # Check for unauthorized SSH keys
        ssh_dirs = ["/root/.ssh", "/home/*/.ssh"]
        for pattern in ssh_dirs:
            for ssh_dir in subprocess.getoutput(f"ls -d {pattern} 2>/dev/null").split():
                if os.path.exists(f"{ssh_dir}/authorized_keys"):
                    self.log_action(
                        "Checking SSH Keys",
                        f"Found authorized_keys in {ssh_dir}"
                    )
                    
        # Check for modified system files
        critical_files = [
            "/etc/passwd",
            "/etc/shadow",
            "/etc/sudoers",
            "/etc/crontab"
        ]
        
        for file in critical_files:
            if os.path.exists(file):
                # Get file hash
                with open(file, 'rb') as f:
                    file_hash = hashlib.sha256(f.read()).hexdigest()
                    
                # Check modification time
                mtime = datetime.datetime.fromtimestamp(
                    os.path.getmtime(file)
                )
                
                if (datetime.datetime.now() - mtime).days < 7:
                    findings.append(
                        f"Recently modified critical file: {file} "
                        f"(modified: {mtime}, hash: {file_hash})"
                    )
                    
        # Check web server configurations
        if os.path.exists("/etc/apache2"):
            apache_configs = subprocess.getoutput(
                "grep -r 'ProxyPass\\|Include\\|LoadModule' /etc/apache2/"
            )
            if "proxy" in apache_configs.lower():
                findings.append("Proxy configuration found in Apache")
                
        # Save findings
        with open(f"{self.incident_dir}/findings.json", "w") as f:
            json.dump(findings, f, indent=2)
            
        self.log_action("Compromise Analysis Completed", f"Found {len(findings)} issues")
        
        return findings
        
    def contain_threat(self):
        """Contain the identified threat"""
        self.log_action("Threat Containment Started")
        
        # Stop suspicious services
        suspicious_services = [
            "apache2", "nginx", "mysql", "postgresql"
        ]
        
        for service in suspicious_services:
            result = subprocess.run(
                ["systemctl", "is-active", service],
                capture_output=True,
                text=True
            )
            
            if result.stdout.strip() == "active":
                self.log_action(
                    f"Stopping Service",
                    f"Stopping {service} for investigation"
                )
                subprocess.run(["systemctl", "stop", service])
                
        # Kill suspicious processes
        # (In practice, be more selective)
        
        # Disable cron jobs temporarily
        subprocess.run(["systemctl", "stop", "cron"])
        self.log_action("Cron Disabled", "Cron service stopped")
        
        # Revoke all current sessions
        subprocess.run(["pkill", "-KILL", "-u", "www-data"])
        
        self.log_action("Threat Containment Completed")
        
    def create_recovery_plan(self):
        """Create a recovery plan based on findings"""
        self.log_action("Creating Recovery Plan")
        
        plan = {
            "incident_id": self.incident_id,
            "created": datetime.datetime.now().isoformat(),
            "steps": [
                {
                    "priority": 1,
                    "action": "Verify all backups are intact and recent",
                    "responsible": "Operations Team"
                },
                {
                    "priority": 2,
                    "action": "Rebuild affected systems from known-good images",
                    "responsible": "Infrastructure Team"
                },
                {
                    "priority": 3,
                    "action": "Restore data from verified backups",
                    "responsible": "Operations Team"
                },
                {
                    "priority": 4,
                    "action": "Apply all security patches and updates",
                    "responsible": "Security Team"
                },
                {
                    "priority": 5,
                    "action": "Reset all credentials and certificates",
                    "responsible": "Security Team"
                },
                {
                    "priority": 6,
                    "action": "Implement additional monitoring",
                    "responsible": "Security Team"
                },
                {
                    "priority": 7,
                    "action": "Conduct post-incident review",
                    "responsible": "All Teams"
                }
            ]
        }
        
        with open(f"{self.incident_dir}/recovery-plan.json", "w") as f:
            json.dump(plan, f, indent=2)
            
        self.log_action("Recovery Plan Created")
        
        return plan
        
    def generate_report(self):
        """Generate incident report"""
        self.log_action("Generating Incident Report")
        
        report = f"""
# Security Incident Report