Building an Incident Response Framework
Building an Incident Response Framework
Effective incident response requires more than technical procedures—it demands organizational readiness, clear communication channels, and practiced responses. The incident response framework must address the unique challenges of data breaches, where every minute of delay can exponentially increase exposure. Unlike general security incidents, data breaches often involve legal obligations, regulatory notifications, and reputational considerations that complicate response efforts.
Modern incident response frameworks adopt assume-breach mentalities, preparing for incidents as inevitable rather than preventable events. This mindset shift drives proactive preparation, comprehensive playbooks, and regular drills that ensure readiness. Response teams must balance speed with thoroughness, containing breaches quickly while preserving evidence for investigation and legal proceedings.
The framework must scale from minor incidents to major breaches without overwhelming response capabilities. Tiered response structures allocate resources based on incident severity, ensuring appropriate attention without over-reacting to minor events. Clear escalation procedures prevent delays when incidents exceed initial assessments. Regular reviews ensure the framework evolves with changing threats and organizational growth.
# Example: Comprehensive incident response system for data breaches
import asyncio
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
from enum import Enum
import logging
from cryptography.fernet import Fernet
class IncidentSeverity(Enum):
LOW = 1 # Minor issue, no data exposure
MEDIUM = 2 # Limited data exposure, contained
HIGH = 3 # Significant data exposure
CRITICAL = 4 # Major breach, regulatory impact
class IncidentPhase(Enum):
DETECTION = "detection"
TRIAGE = "triage"
CONTAINMENT = "containment"
ERADICATION = "eradication"
RECOVERY = "recovery"
POST_INCIDENT = "post_incident"
@dataclass
class DataBreachIncident:
incident_id: str
detected_at: datetime
severity: IncidentSeverity
affected_systems: List[str]
data_categories: List[str]
estimated_records: int
attack_vector: Optional[str]
threat_actor: Optional[str]
current_phase: IncidentPhase
containment_status: str
regulatory_requirements: List[str]
class IncidentResponseOrchestrator:
def __init__(self, config):
self.config = config
self.detection_system = BreachDetectionSystem()
self.containment_engine = ContainmentEngine()
self.forensics_collector = ForensicsCollector()
self.recovery_manager = RecoveryManager()
self.notification_system = NotificationSystem()
self.logger = self._setup_secure_logging()
async def handle_potential_breach(self, alert: Dict):
"""Orchestrate response to potential data breach"""
# Phase 1: Initial Detection and Validation
incident = await self._validate_and_classify_alert(alert)
if not incident:
return None
# Create incident record
incident_id = self._generate_incident_id()
incident_record = DataBreachIncident(
incident_id=incident_id,
detected_at=datetime.utcnow(),
severity=incident['severity'],
affected_systems=incident['affected_systems'],
data_categories=incident['data_categories'],
estimated_records=incident['estimated_records'],
attack_vector=incident.get('attack_vector'),
threat_actor=incident.get('threat_actor'),
current_phase=IncidentPhase.DETECTION,
containment_status='not_started',
regulatory_requirements=self._identify_regulatory_requirements(
incident['data_categories']
)
)
# Start incident timeline
await self._initialize_incident_timeline(incident_record)
# Phase 2: Triage and Initial Response
incident_record.current_phase = IncidentPhase.TRIAGE
# Activate incident response team
response_team = await self._activate_response_team(
incident_record.severity
)
# Begin evidence preservation
asyncio.create_task(
self.forensics_collector.preserve_evidence(incident_record)
)
# Phase 3: Containment
incident_record.current_phase = IncidentPhase.CONTAINMENT
containment_result = await self._execute_containment(incident_record)
# Phase 4: Investigation and Eradication
if containment_result.success:
incident_record.current_phase = IncidentPhase.ERADICATION
await self._investigate_and_eradicate(incident_record)
# Phase 5: Recovery
incident_record.current_phase = IncidentPhase.RECOVERY
await self._execute_recovery(incident_record)
# Phase 6: Post-Incident Activities
incident_record.current_phase = IncidentPhase.POST_INCIDENT
await self._post_incident_activities(incident_record)
return incident_record
async def _execute_containment(self, incident: DataBreachIncident):
"""Execute containment strategies based on incident type"""
containment_actions = []
# Immediate network isolation
if incident.severity >= IncidentSeverity.HIGH:
isolation_result = await self.containment_engine.isolate_systems(
incident.affected_systems
)
containment_actions.append({
'action': 'network_isolation',
'systems': incident.affected_systems,
'result': isolation_result,
'timestamp': datetime.utcnow()
})
# Revoke compromised credentials
compromised_creds = await self._identify_compromised_credentials(
incident
)
for cred_set in compromised_creds:
revoke_result = await self._revoke_credentials(cred_set)
containment_actions.append({
'action': 'credential_revocation',
'credentials': cred_set['id'],
'result': revoke_result,
'timestamp': datetime.utcnow()
})
# Block attack vectors
if incident.attack_vector:
block_result = await self._block_attack_vector(
incident.attack_vector
)
containment_actions.append({
'action': 'vector_blocking',
'vector': incident.attack_vector,
'result': block_result,
'timestamp': datetime.utcnow()
})
# Data access restrictions
restriction_result = await self._restrict_data_access(
incident.data_categories
)
containment_actions.append({
'action': 'access_restriction',
'data_categories': incident.data_categories,
'result': restriction_result,
'timestamp': datetime.utcnow()
})
# Update incident status
incident.containment_status = self._evaluate_containment_status(
containment_actions
)
return ContainmentResult(
success=incident.containment_status == 'contained',
actions=containment_actions,
remaining_risks=self._identify_remaining_risks(incident)
)
async def _investigate_and_eradicate(self, incident: DataBreachIncident):
"""Investigate root cause and eradicate threat"""
# Forensic analysis
forensic_data = await self.forensics_collector.analyze_incident(
incident
)
# Timeline reconstruction
attack_timeline = await self._reconstruct_attack_timeline(
forensic_data
)
# Impact assessment
impact_assessment = await self._assess_data_impact(incident)
# Identify all affected data
affected_data = await self._enumerate_affected_data(
incident,
forensic_data
)
# Eradication actions
eradication_tasks = []
# Remove malware/backdoors
if forensic_data.get('malware_indicators'):
eradication_tasks.append(
self._remove_malware(forensic_data['malware_indicators'])
)
# Patch vulnerabilities
if forensic_data.get('exploited_vulnerabilities'):
eradication_tasks.append(
self._patch_vulnerabilities(
forensic_data['exploited_vulnerabilities']
)
)
# Reset compromised systems
if forensic_data.get('compromised_systems'):
eradication_tasks.append(
self._reset_systems(forensic_data['compromised_systems'])
)
# Execute eradication
eradication_results = await asyncio.gather(*eradication_tasks)
# Verify eradication
verification = await self._verify_threat_eradication(incident)
return {
'timeline': attack_timeline,
'impact': impact_assessment,
'affected_data': affected_data,
'eradication_results': eradication_results,
'verification': verification
}
class DataRecoveryManager:
"""Manage data recovery after security incidents"""
def __init__(self, config):
self.config = config
self.backup_system = BackupSystem()
self.integrity_validator = IntegrityValidator()
self.encryption_manager = EncryptionManager()
async def plan_recovery(self, incident: DataBreachIncident) -> RecoveryPlan:
"""Create comprehensive recovery plan"""
# Assess recovery requirements
requirements = await self._assess_recovery_requirements(incident)
# Identify recovery sources
recovery_sources = await self._identify_recovery_sources(
incident.affected_systems,
incident.data_categories
)
# Create recovery timeline
timeline = self._create_recovery_timeline(
requirements,
recovery_sources
)
# Build recovery plan
plan = RecoveryPlan(
incident_id=incident.incident_id,
priority_order=self._determine_recovery_priority(requirements),
recovery_points=self._identify_recovery_points(recovery_sources),
validation_requirements=self._define_validation_requirements(
incident
),
timeline=timeline,
rollback_procedures=self._create_rollback_procedures(),
communication_plan=self._create_communication_plan(incident)
)
return plan
async def execute_recovery(self, plan: RecoveryPlan):
"""Execute data recovery plan"""
recovery_status = {
'started_at': datetime.utcnow(),
'phases': [],
'issues': [],
'completed': False
}
for phase in plan.priority_order:
phase_result = await self._execute_recovery_phase(phase, plan)
recovery_status['phases'].append(phase_result)
if not phase_result['success']:
# Handle recovery failure
await self._handle_recovery_failure(phase, phase_result)
recovery_status['issues'].append({
'phase': phase.name,
'error': phase_result['error'],
'timestamp': datetime.utcnow()
})
# Attempt alternative recovery
alt_result = await self._attempt_alternative_recovery(
phase, plan
)
if not alt_result['success']:
break
# Validate recovered data
validation_results = await self._validate_recovered_data(
plan,
recovery_status
)
recovery_status['validation'] = validation_results
recovery_status['completed'] = all(
phase['success'] for phase in recovery_status['phases']
)
recovery_status['completed_at'] = datetime.utcnow()
return recovery_status
async def _execute_recovery_phase(self, phase: RecoveryPhase, plan: RecoveryPlan):
"""Execute individual recovery phase"""
phase_result = {
'phase': phase.name,
'started_at': datetime.utcnow(),
'success': False,
'data_recovered': 0,
'systems_restored': []
}
try:
# Prepare recovery environment
env = await self._prepare_recovery_environment(phase)
# Restore data from backup
for system in phase.systems:
restore_result = await self._restore_system_data(
system,
phase.recovery_point,
env
)
if restore_result['success']:
# Verify data integrity
integrity_check = await self.integrity_validator.verify(
restore_result['data'],
system.integrity_metadata
)
if integrity_check.valid:
# Re-encrypt with new keys
reencrypted = await self._reencrypt_recovered_data(
restore_result['data'],
system
)
# Deploy recovered data
deploy_result = await self._deploy_recovered_data(
reencrypted,
system
)
phase_result['systems_restored'].append({
'system': system.id,
'records_recovered': deploy_result['record_count'],
'integrity_verified': True
})
phase_result['data_recovered'] += deploy_result['record_count']
else:
phase_result['integrity_failures'] = integrity_check.failures
phase_result['success'] = len(phase_result['systems_restored']) > 0
phase_result['completed_at'] = datetime.utcnow()
except Exception as e:
phase_result['error'] = str(e)
phase_result['stack_trace'] = self._get_sanitized_stack_trace()
return phase_result
async def _reencrypt_recovered_data(self, data: bytes, system: System) -> bytes:
"""Re-encrypt data with new keys after incident"""
# Generate new encryption keys
new_keys = await self.encryption_manager.generate_incident_keys(
system.id,
key_type='recovery'
)
# Decrypt with backup keys
decrypted = await self.encryption_manager.decrypt_backup_data(
data,
system.backup_key_id
)
# Re-encrypt with new keys
reencrypted = await self.encryption_manager.encrypt_data(
decrypted,
new_keys['data_key'],
additional_data={
'recovery_date': datetime.utcnow().isoformat(),
'incident_id': system.incident_id,
'key_rotation_id': new_keys['rotation_id']
}
)
# Secure key storage
await self._store_recovery_keys(new_keys, system)
return reencrypted