Containment and Eradication Strategies

Containment and Eradication Strategies

Containment prevents incident spread while preserving evidence for investigation. Effective containment strategies balance speed with thoroughness, ensuring threats are neutralized without destroying valuable forensic data. Eradication removes all traces of the incident, requiring careful verification to prevent reinfection.

Implement automated containment procedures:

#!/usr/bin/env python3
"""
Automated Incident Containment System
"""

import os
import sys
import subprocess
import json
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
import asyncio
import ipaddress

class ContainmentAction:
    """Base class for containment actions"""
    
    def __init__(self, name: str, reversible: bool = True):
        self.name = name
        self.reversible = reversible
        self.logger = logging.getLogger(f"Containment.{name}")
        self.rollback_data = {}
    
    async def execute(self, target: str, params: Dict[str, Any]) -> bool:
        """Execute containment action"""
        raise NotImplementedError
    
    async def rollback(self, target: str) -> bool:
        """Rollback containment action if reversible"""
        if not self.reversible:
            self.logger.warning(f"Action {self.name} is not reversible")
            return False
        raise NotImplementedError

class NetworkIsolation(ContainmentAction):
    """Isolate system from network"""
    
    def __init__(self):
        super().__init__("NetworkIsolation", reversible=True)
    
    async def execute(self, target: str, params: Dict[str, Any]) -> bool:
        try:
            platform = params.get('platform', 'linux')
            
            if platform == 'linux':
                # Save current iptables rules
                proc = await asyncio.create_subprocess_exec(
                    'iptables-save',
                    stdout=asyncio.subprocess.PIPE
                )
                stdout, _ = await proc.communicate()
                self.rollback_data[target] = stdout.decode()
                
                # Block all traffic except management subnet
                mgmt_subnet = params.get('mgmt_subnet', '10.0.0.0/24')
                
                commands = [
                    ['iptables', '-P', 'INPUT', 'DROP'],
                    ['iptables', '-P', 'OUTPUT', 'DROP'],
                    ['iptables', '-P', 'FORWARD', 'DROP'],
                    ['iptables', '-A', 'INPUT', '-s', mgmt_subnet, '-j', 'ACCEPT'],
                    ['iptables', '-A', 'OUTPUT', '-d', mgmt_subnet, '-j', 'ACCEPT'],
                    ['iptables', '-A', 'INPUT', '-i', 'lo', '-j', 'ACCEPT'],
                    ['iptables', '-A', 'OUTPUT', '-o', 'lo', '-j', 'ACCEPT'],
                ]
                
                for cmd in commands:
                    proc = await asyncio.create_subprocess_exec(*cmd)
                    await proc.communicate()
                
                self.logger.info(f"Network isolation applied to {target}")
                return True
                
            elif platform == 'windows':
                # Windows network isolation
                commands = [
                    ['netsh', 'advfirewall', 'set', 'allprofiles', 'state', 'on'],
                    ['netsh', 'advfirewall', 'set', 'allprofiles', 'firewallpolicy', 
                     'blockinbound,blockoutbound'],
                    ['netsh', 'advfirewall', 'firewall', 'add', 'rule',
                     'name="Allow Management"', 'dir=in', 'action=allow',
                     f'remoteip={params.get("mgmt_subnet", "10.0.0.0/24")}'],
                    ['netsh', 'advfirewall', 'firewall', 'add', 'rule',
                     'name="Allow Management Out"', 'dir=out', 'action=allow',
                     f'remoteip={params.get("mgmt_subnet", "10.0.0.0/24")}']
                ]
                
                for cmd in commands:
                    proc = await asyncio.create_subprocess_exec(*cmd)
                    await proc.communicate()
                
                self.logger.info(f"Network isolation applied to {target}")
                return True
                
        except Exception as e:
            self.logger.error(f"Failed to isolate {target}: {e}")
            return False
    
    async def rollback(self, target: str) -> bool:
        try:
            if target in self.rollback_data:
                # Restore iptables rules
                proc = await asyncio.create_subprocess_exec(
                    'iptables-restore',
                    stdin=asyncio.subprocess.PIPE
                )
                await proc.communicate(self.rollback_data[target].encode())
                
                self.logger.info(f"Network isolation removed from {target}")
                return True
            
        except Exception as e:
            self.logger.error(f"Failed to rollback isolation for {target}: {e}")
            return False

class AccountDisable(ContainmentAction):
    """Disable compromised accounts"""
    
    def __init__(self):
        super().__init__("AccountDisable", reversible=True)
    
    async def execute(self, target: str, params: Dict[str, Any]) -> bool:
        try:
            account = params['account']
            platform = params.get('platform', 'linux')
            
            if platform == 'linux':
                # Backup current account state
                proc = await asyncio.create_subprocess_exec(
                    'getent', 'passwd', account,
                    stdout=asyncio.subprocess.PIPE
                )
                stdout, _ = await proc.communicate()
                self.rollback_data[f"{target}:{account}"] = stdout.decode()
                
                # Disable account
                proc = await asyncio.create_subprocess_exec(
                    'usermod', '-L', account
                )
                await proc.communicate()
                
                # Kill user sessions
                proc = await asyncio.create_subprocess_exec(
                    'pkill', '-KILL', '-u', account
                )
                await proc.communicate()
                
            elif platform == 'windows':
                # Disable AD account
                proc = await asyncio.create_subprocess_exec(
                    'powershell', '-Command',
                    f'Disable-ADAccount -Identity {account}'
                )
                await proc.communicate()
                
                # Force logoff
                proc = await asyncio.create_subprocess_exec(
                    'powershell', '-Command',
                    f'Get-CimInstance -ClassName Win32_LogonSession | Where-Object {{$_.LogonType -eq 2}} | Get-CimAssociatedInstance -ClassName Win32_LoggedOnUser | Where-Object {{$_.Name -eq "{account}"}} | ForEach-Object {{logoff $_.LogonId /server:{target}}}'
                )
                await proc.communicate()
            
            self.logger.info(f"Disabled account {account} on {target}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to disable account: {e}")
            return False

class ProcessTermination(ContainmentAction):
    """Terminate malicious processes"""
    
    def __init__(self):
        super().__init__("ProcessTermination", reversible=False)
    
    async def execute(self, target: str, params: Dict[str, Any]) -> bool:
        try:
            process_pattern = params['process_pattern']
            platform = params.get('platform', 'linux')
            
            if platform == 'linux':
                # Find and kill processes
                proc = await asyncio.create_subprocess_exec(
                    'pkill', '-f', process_pattern
                )
                await proc.communicate()
                
                # Verify termination
                proc = await asyncio.create_subprocess_exec(
                    'pgrep', '-f', process_pattern,
                    stdout=asyncio.subprocess.PIPE
                )
                stdout, _ = await proc.communicate()
                
                if stdout:
                    # Force kill if still running
                    proc = await asyncio.create_subprocess_exec(
                        'pkill', '-9', '-f', process_pattern
                    )
                    await proc.communicate()
                    
            elif platform == 'windows':
                # Windows process termination
                proc = await asyncio.create_subprocess_exec(
                    'powershell', '-Command',
                    f'Get-Process | Where-Object {{$_.ProcessName -match "{process_pattern}"}} | Stop-Process -Force'
                )
                await proc.communicate()
            
            self.logger.info(f"Terminated processes matching {process_pattern} on {target}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to terminate processes: {e}")
            return False

class ContainmentOrchestrator:
    """Orchestrate containment actions"""
    
    def __init__(self):
        self.actions = {
            'network_isolation': NetworkIsolation(),
            'account_disable': AccountDisable(),
            'process_termination': ProcessTermination(),
        }
        self.containment_log = []
        self.logger = logging.getLogger("ContainmentOrchestrator")
    
    async def contain_incident(self, incident: Dict[str, Any]) -> Dict[str, Any]:
        """Execute containment plan based on incident type"""
        
        incident_type = incident['type']
        affected_systems = incident['affected_systems']
        
        containment_plan = self._generate_containment_plan(incident_type, incident)
        
        results = {
            'incident_id': incident['id'],
            'containment_start': datetime.now().isoformat(),
            'actions_taken': [],
            'success': True
        }
        
        for action_spec in containment_plan:
            action_name = action_spec['action']
            action = self.actions.get(action_name)
            
            if not action:
                self.logger.error(f"Unknown action: {action_name}")
                continue
            
            for system in affected_systems:
                success = await action.execute(system, action_spec['params'])
                
                results['actions_taken'].append({
                    'action': action_name,
                    'target': system,
                    'success': success,
                    'timestamp': datetime.now().isoformat()
                })
                
                if not success:
                    results['success'] = False
        
        results['containment_end'] = datetime.now().isoformat()
        self.containment_log.append(results)
        
        return results
    
    def _generate_containment_plan(self, incident_type: str, incident: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Generate containment plan based on incident type"""
        
        plans = {
            'malware': [
                {'action': 'network_isolation', 'params': {'mgmt_subnet': '10.0.0.0/24'}},
                {'action': 'process_termination', 'params': {'process_pattern': incident.get('malware_name', 'malicious')}},
            ],
            'compromised_account': [
                {'action': 'account_disable', 'params': {'account': incident['compromised_account']}},
                {'action': 'network_isolation', 'params': {'mgmt_subnet': '10.0.0.0/24'}},
            ],
            'data_breach': [
                {'action': 'network_isolation', 'params': {'mgmt_subnet': '10.0.0.0/24'}},
                {'action': 'account_disable', 'params': {'account': incident.get('suspected_account', 'unknown')}},
            ],
            'ransomware': [
                {'action': 'network_isolation', 'params': {'mgmt_subnet': '10.0.0.0/24'}},
                {'action': 'process_termination', 'params': {'process_pattern': '*.exe'}},
            ]
        }
        
        return plans.get(incident_type, [])
    
    async def verify_containment(self, incident_id: str) -> bool:
        """Verify containment effectiveness"""
        
        containment_record = next(
            (record for record in self.containment_log if record['incident_id'] == incident_id),
            None
        )
        
        if not containment_record:
            return False
        
        # Perform verification checks
        verification_tasks = []
        
        for action_record in containment_record['actions_taken']:
            if action_record['action'] == 'network_isolation':
                # Verify no unexpected network connections
                verification_tasks.append(
                    self._verify_network_isolation(action_record['target'])
                )
            elif action_record['action'] == 'account_disable':
                # Verify account is disabled and no active sessions
                verification_tasks.append(
                    self._verify_account_disabled(action_record['target'])
                )
        
        results = await asyncio.gather(*verification_tasks)
        
        return all(results)
    
    async def _verify_network_isolation(self, target: str) -> bool:
        """Verify network isolation is effective"""
        # Implementation depends on monitoring capabilities
        return True
    
    async def _verify_account_disabled(self, target: str) -> bool:
        """Verify account is disabled"""
        # Implementation depends on platform
        return True