Real-time Security Event Detection
Real-time Security Event Detection
Real-time security event detection requires sophisticated stream processing capabilities that can analyze millions of events per second. Traditional signature-based detection fails to identify novel attacks or subtle behavioral anomalies. Modern detection systems combine rule-based detection, statistical analysis, and machine learning to identify security threats as they occur.
#!/usr/bin/env python3
# security_event_processor.py - Real-time security event processing
import asyncio
import json
from typing import Dict, List, Any
from datetime import datetime, timedelta
import numpy as np
from sklearn.ensemble import IsolationForest
import redis
from kafka import KafkaConsumer, KafkaProducer
import logging
class SecurityEventProcessor:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.redis_client = redis.Redis(
host=config['redis_host'],
port=config['redis_port'],
decode_responses=True
)
self.anomaly_detector = self._initialize_anomaly_detection()
self.event_patterns = self._load_event_patterns()
self.threat_feeds = self._load_threat_intelligence()
async def process_event_stream(self):
"""Process security events in real-time"""
consumer = KafkaConsumer(
'security-events',
bootstrap_servers=self.config['kafka_brokers'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=self.config['kafka_brokers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for message in consumer:
event = message.value
# Enrich event with context
enriched_event = await self._enrich_event(event)
# Detect security incidents
detections = await self._detect_incidents(enriched_event)
if detections:
# Generate alert
alert = self._create_alert(enriched_event, detections)
# Send to alert topic
producer.send('security-alerts', value=alert)
# Update metrics
self._update_metrics(alert)
# Trigger automated response if configured
if alert['severity'] == 'critical':
await self._trigger_automated_response(alert)
async def _enrich_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Enrich event with additional context"""
enriched = event.copy()
# Add user context
if 'user_id' in event:
user_context = await self._get_user_context(event['user_id'])
enriched['user_context'] = user_context
# Add asset context
if 'asset_id' in event:
asset_context = await self._get_asset_context(event['asset_id'])
enriched['asset_context'] = asset_context
# Add historical context
enriched['historical_stats'] = await self._get_historical_stats(event)
# Add threat intelligence
enriched['threat_intel'] = await self._check_threat_intelligence(event)
return enriched
async def _detect_incidents(self, event: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Detect security incidents using multiple techniques"""
detections = []
# Rule-based detection
rule_detections = self._apply_detection_rules(event)
detections.extend(rule_detections)
# Anomaly detection
if self._is_anomalous(event):
detections.append({
'type': 'anomaly',
'confidence': self._calculate_anomaly_score(event),
'description': 'Unusual behavior detected'
})
# Behavioral analysis
behavioral_detections = await self._analyze_behavior(event)
detections.extend(behavioral_detections)
# Correlation analysis
correlation_detections = await self._correlate_events(event)
detections.extend(correlation_detections)
return detections
def _apply_detection_rules(self, event: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Apply rule-based detection"""
detections = []
# Authentication anomalies
if event.get('event_type') == 'authentication':
# Multiple failed attempts
failed_attempts = self._get_failed_attempts(
event['user_id'],
timedelta(minutes=5)
)
if failed_attempts >= 5:
detections.append({
'type': 'brute_force',
'confidence': 0.9,
'description': f'Multiple failed login attempts: {failed_attempts}'
})
# Impossible travel
if self._check_impossible_travel(event):
detections.append({
'type': 'impossible_travel',
'confidence': 0.95,
'description': 'Login from geographically impossible location'
})
# Privilege escalation
if event.get('event_type') == 'authorization':
if self._detect_privilege_escalation(event):
detections.append({
'type': 'privilege_escalation',
'confidence': 0.85,
'description': 'Unusual privilege elevation detected'
})
# Data exfiltration
if event.get('event_type') == 'data_access':
if self._detect_data_exfiltration(event):
detections.append({
'type': 'data_exfiltration',
'confidence': 0.8,
'description': 'Potential data exfiltration pattern'
})
return detections
async def _analyze_behavior(self, event: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Analyze user and entity behavior"""
detections = []
# Get baseline behavior
baseline = await self._get_behavior_baseline(
event.get('user_id'),
event.get('asset_id')
)
# Compare current behavior to baseline
if baseline:
deviation_score = self._calculate_deviation(event, baseline)
if deviation_score > self.config['behavior_threshold']:
detections.append({
'type': 'behavioral_anomaly',
'confidence': min(deviation_score / 100, 1.0),
'description': 'Significant deviation from normal behavior',
'details': {
'deviation_score': deviation_score,
'baseline_period': baseline['period'],
'anomalous_features': self._identify_anomalous_features(
event, baseline
)
}
})
return detections
def _create_alert(self, event: Dict[str, Any],
detections: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Create security alert from detections"""
# Calculate overall severity
max_confidence = max(d['confidence'] for d in detections)
severity = self._calculate_severity(detections, event)
alert = {
'alert_id': self._generate_alert_id(),
'timestamp': datetime.utcnow().isoformat(),
'severity': severity,
'confidence': max_confidence,
'event': event,
'detections': detections,
'recommended_actions': self._get_recommended_actions(detections),
'context': {
'affected_assets': self._identify_affected_assets(event),
'related_alerts': self._find_related_alerts(event),
'attack_chain_position': self._analyze_attack_chain(event)
}
}
return alert
async def _trigger_automated_response(self, alert: Dict[str, Any]):
"""Trigger automated incident response"""
response_actions = []
for detection in alert['detections']:
if detection['type'] == 'brute_force':
# Block source IP
action = await self._block_ip_address(
alert['event'].get('source_ip')
)
response_actions.append(action)
# Disable account temporarily
action = await self._disable_account(
alert['event'].get('user_id'),
duration=timedelta(minutes=30)
)
response_actions.append(action)
elif detection['type'] == 'data_exfiltration':
# Revoke access tokens
action = await self._revoke_tokens(
alert['event'].get('user_id')
)
response_actions.append(action)
# Initiate forensic capture
action = await self._capture_forensics(
alert['event'].get('asset_id')
)
response_actions.append(action)
# Log response actions
await self._log_response_actions(alert['alert_id'], response_actions)
# Notify security team
await self._notify_security_team(alert, response_actions)
# SOAR integration for automated response
class SecurityOrchestrationAutomation:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.playbooks = self._load_playbooks()
self.connectors = self._initialize_connectors()
async def handle_security_alert(self, alert: Dict[str, Any]):
"""Orchestrate response to security alert"""
# Determine applicable playbooks
applicable_playbooks = self._match_playbooks(alert)
for playbook in applicable_playbooks:
# Execute playbook
execution_result = await self._execute_playbook(
playbook,
alert
)
# Track execution
await self._track_execution(
alert['alert_id'],
playbook['name'],
execution_result
)
async def _execute_playbook(self, playbook: Dict[str, Any],
alert: Dict[str, Any]) -> Dict[str, Any]:
"""Execute security response playbook"""
execution_context = {
'alert': alert,
'start_time': datetime.utcnow(),
'actions_executed': [],
'status': 'running'
}
try:
for step in playbook['steps']:
# Check conditions
if not self._evaluate_conditions(step.get('conditions', []),
execution_context):
continue
# Execute action
action_result = await self._execute_action(
step['action'],
execution_context
)
execution_context['actions_executed'].append({
'action': step['action']['type'],
'result': action_result,
'timestamp': datetime.utcnow().isoformat()
})
# Check if we should continue
if action_result.get('stop_execution'):
break
execution_context['status'] = 'completed'
except Exception as e:
execution_context['status'] = 'failed'
execution_context['error'] = str(e)
logging.error(f"Playbook execution failed: {e}")
execution_context['end_time'] = datetime.utcnow()
return execution_context