Containment and Eradication Strategies
Containment and Eradication Strategies
Containment prevents incident spread while preserving evidence for investigation. Effective containment strategies balance speed with thoroughness, ensuring threats are neutralized without destroying valuable forensic data. Eradication removes all traces of the incident, requiring careful verification to prevent reinfection.
Implement automated containment procedures:
#!/usr/bin/env python3
"""
Automated Incident Containment System
"""
import os
import sys
import subprocess
import json
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
import asyncio
import ipaddress
class ContainmentAction:
"""Base class for containment actions"""
def __init__(self, name: str, reversible: bool = True):
self.name = name
self.reversible = reversible
self.logger = logging.getLogger(f"Containment.{name}")
self.rollback_data = {}
async def execute(self, target: str, params: Dict[str, Any]) -> bool:
"""Execute containment action"""
raise NotImplementedError
async def rollback(self, target: str) -> bool:
"""Rollback containment action if reversible"""
if not self.reversible:
self.logger.warning(f"Action {self.name} is not reversible")
return False
raise NotImplementedError
class NetworkIsolation(ContainmentAction):
"""Isolate system from network"""
def __init__(self):
super().__init__("NetworkIsolation", reversible=True)
async def execute(self, target: str, params: Dict[str, Any]) -> bool:
try:
platform = params.get('platform', 'linux')
if platform == 'linux':
# Save current iptables rules
proc = await asyncio.create_subprocess_exec(
'iptables-save',
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
self.rollback_data[target] = stdout.decode()
# Block all traffic except management subnet
mgmt_subnet = params.get('mgmt_subnet', '10.0.0.0/24')
commands = [
['iptables', '-P', 'INPUT', 'DROP'],
['iptables', '-P', 'OUTPUT', 'DROP'],
['iptables', '-P', 'FORWARD', 'DROP'],
['iptables', '-A', 'INPUT', '-s', mgmt_subnet, '-j', 'ACCEPT'],
['iptables', '-A', 'OUTPUT', '-d', mgmt_subnet, '-j', 'ACCEPT'],
['iptables', '-A', 'INPUT', '-i', 'lo', '-j', 'ACCEPT'],
['iptables', '-A', 'OUTPUT', '-o', 'lo', '-j', 'ACCEPT'],
]
for cmd in commands:
proc = await asyncio.create_subprocess_exec(*cmd)
await proc.communicate()
self.logger.info(f"Network isolation applied to {target}")
return True
elif platform == 'windows':
# Windows network isolation
commands = [
['netsh', 'advfirewall', 'set', 'allprofiles', 'state', 'on'],
['netsh', 'advfirewall', 'set', 'allprofiles', 'firewallpolicy',
'blockinbound,blockoutbound'],
['netsh', 'advfirewall', 'firewall', 'add', 'rule',
'name="Allow Management"', 'dir=in', 'action=allow',
f'remoteip={params.get("mgmt_subnet", "10.0.0.0/24")}'],
['netsh', 'advfirewall', 'firewall', 'add', 'rule',
'name="Allow Management Out"', 'dir=out', 'action=allow',
f'remoteip={params.get("mgmt_subnet", "10.0.0.0/24")}']
]
for cmd in commands:
proc = await asyncio.create_subprocess_exec(*cmd)
await proc.communicate()
self.logger.info(f"Network isolation applied to {target}")
return True
except Exception as e:
self.logger.error(f"Failed to isolate {target}: {e}")
return False
async def rollback(self, target: str) -> bool:
try:
if target in self.rollback_data:
# Restore iptables rules
proc = await asyncio.create_subprocess_exec(
'iptables-restore',
stdin=asyncio.subprocess.PIPE
)
await proc.communicate(self.rollback_data[target].encode())
self.logger.info(f"Network isolation removed from {target}")
return True
except Exception as e:
self.logger.error(f"Failed to rollback isolation for {target}: {e}")
return False
class AccountDisable(ContainmentAction):
"""Disable compromised accounts"""
def __init__(self):
super().__init__("AccountDisable", reversible=True)
async def execute(self, target: str, params: Dict[str, Any]) -> bool:
try:
account = params['account']
platform = params.get('platform', 'linux')
if platform == 'linux':
# Backup current account state
proc = await asyncio.create_subprocess_exec(
'getent', 'passwd', account,
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
self.rollback_data[f"{target}:{account}"] = stdout.decode()
# Disable account
proc = await asyncio.create_subprocess_exec(
'usermod', '-L', account
)
await proc.communicate()
# Kill user sessions
proc = await asyncio.create_subprocess_exec(
'pkill', '-KILL', '-u', account
)
await proc.communicate()
elif platform == 'windows':
# Disable AD account
proc = await asyncio.create_subprocess_exec(
'powershell', '-Command',
f'Disable-ADAccount -Identity {account}'
)
await proc.communicate()
# Force logoff
proc = await asyncio.create_subprocess_exec(
'powershell', '-Command',
f'Get-CimInstance -ClassName Win32_LogonSession | Where-Object {{$_.LogonType -eq 2}} | Get-CimAssociatedInstance -ClassName Win32_LoggedOnUser | Where-Object {{$_.Name -eq "{account}"}} | ForEach-Object {{logoff $_.LogonId /server:{target}}}'
)
await proc.communicate()
self.logger.info(f"Disabled account {account} on {target}")
return True
except Exception as e:
self.logger.error(f"Failed to disable account: {e}")
return False
class ProcessTermination(ContainmentAction):
"""Terminate malicious processes"""
def __init__(self):
super().__init__("ProcessTermination", reversible=False)
async def execute(self, target: str, params: Dict[str, Any]) -> bool:
try:
process_pattern = params['process_pattern']
platform = params.get('platform', 'linux')
if platform == 'linux':
# Find and kill processes
proc = await asyncio.create_subprocess_exec(
'pkill', '-f', process_pattern
)
await proc.communicate()
# Verify termination
proc = await asyncio.create_subprocess_exec(
'pgrep', '-f', process_pattern,
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
if stdout:
# Force kill if still running
proc = await asyncio.create_subprocess_exec(
'pkill', '-9', '-f', process_pattern
)
await proc.communicate()
elif platform == 'windows':
# Windows process termination
proc = await asyncio.create_subprocess_exec(
'powershell', '-Command',
f'Get-Process | Where-Object {{$_.ProcessName -match "{process_pattern}"}} | Stop-Process -Force'
)
await proc.communicate()
self.logger.info(f"Terminated processes matching {process_pattern} on {target}")
return True
except Exception as e:
self.logger.error(f"Failed to terminate processes: {e}")
return False
class ContainmentOrchestrator:
"""Orchestrate containment actions"""
def __init__(self):
self.actions = {
'network_isolation': NetworkIsolation(),
'account_disable': AccountDisable(),
'process_termination': ProcessTermination(),
}
self.containment_log = []
self.logger = logging.getLogger("ContainmentOrchestrator")
async def contain_incident(self, incident: Dict[str, Any]) -> Dict[str, Any]:
"""Execute containment plan based on incident type"""
incident_type = incident['type']
affected_systems = incident['affected_systems']
containment_plan = self._generate_containment_plan(incident_type, incident)
results = {
'incident_id': incident['id'],
'containment_start': datetime.now().isoformat(),
'actions_taken': [],
'success': True
}
for action_spec in containment_plan:
action_name = action_spec['action']
action = self.actions.get(action_name)
if not action:
self.logger.error(f"Unknown action: {action_name}")
continue
for system in affected_systems:
success = await action.execute(system, action_spec['params'])
results['actions_taken'].append({
'action': action_name,
'target': system,
'success': success,
'timestamp': datetime.now().isoformat()
})
if not success:
results['success'] = False
results['containment_end'] = datetime.now().isoformat()
self.containment_log.append(results)
return results
def _generate_containment_plan(self, incident_type: str, incident: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate containment plan based on incident type"""
plans = {
'malware': [
{'action': 'network_isolation', 'params': {'mgmt_subnet': '10.0.0.0/24'}},
{'action': 'process_termination', 'params': {'process_pattern': incident.get('malware_name', 'malicious')}},
],
'compromised_account': [
{'action': 'account_disable', 'params': {'account': incident['compromised_account']}},
{'action': 'network_isolation', 'params': {'mgmt_subnet': '10.0.0.0/24'}},
],
'data_breach': [
{'action': 'network_isolation', 'params': {'mgmt_subnet': '10.0.0.0/24'}},
{'action': 'account_disable', 'params': {'account': incident.get('suspected_account', 'unknown')}},
],
'ransomware': [
{'action': 'network_isolation', 'params': {'mgmt_subnet': '10.0.0.0/24'}},
{'action': 'process_termination', 'params': {'process_pattern': '*.exe'}},
]
}
return plans.get(incident_type, [])
async def verify_containment(self, incident_id: str) -> bool:
"""Verify containment effectiveness"""
containment_record = next(
(record for record in self.containment_log if record['incident_id'] == incident_id),
None
)
if not containment_record:
return False
# Perform verification checks
verification_tasks = []
for action_record in containment_record['actions_taken']:
if action_record['action'] == 'network_isolation':
# Verify no unexpected network connections
verification_tasks.append(
self._verify_network_isolation(action_record['target'])
)
elif action_record['action'] == 'account_disable':
# Verify account is disabled and no active sessions
verification_tasks.append(
self._verify_account_disabled(action_record['target'])
)
results = await asyncio.gather(*verification_tasks)
return all(results)
async def _verify_network_isolation(self, target: str) -> bool:
"""Verify network isolation is effective"""
# Implementation depends on monitoring capabilities
return True
async def _verify_account_disabled(self, target: str) -> bool:
"""Verify account is disabled"""
# Implementation depends on platform
return True