Real-time Security Event Detection

Real-time Security Event Detection

Real-time security event detection requires sophisticated stream processing capabilities that can analyze millions of events per second. Traditional signature-based detection fails to identify novel attacks or subtle behavioral anomalies. Modern detection systems combine rule-based detection, statistical analysis, and machine learning to identify security threats as they occur.

#!/usr/bin/env python3
# security_event_processor.py - Real-time security event processing

import asyncio
import json
from typing import Dict, List, Any
from datetime import datetime, timedelta
import numpy as np
from sklearn.ensemble import IsolationForest
import redis
from kafka import KafkaConsumer, KafkaProducer
import logging

class SecurityEventProcessor:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.redis_client = redis.Redis(
            host=config['redis_host'],
            port=config['redis_port'],
            decode_responses=True
        )
        self.anomaly_detector = self._initialize_anomaly_detection()
        self.event_patterns = self._load_event_patterns()
        self.threat_feeds = self._load_threat_intelligence()
        
    async def process_event_stream(self):
        """Process security events in real-time"""
        consumer = KafkaConsumer(
            'security-events',
            bootstrap_servers=self.config['kafka_brokers'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        producer = KafkaProducer(
            bootstrap_servers=self.config['kafka_brokers'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        for message in consumer:
            event = message.value
            
            # Enrich event with context
            enriched_event = await self._enrich_event(event)
            
            # Detect security incidents
            detections = await self._detect_incidents(enriched_event)
            
            if detections:
                # Generate alert
                alert = self._create_alert(enriched_event, detections)
                
                # Send to alert topic
                producer.send('security-alerts', value=alert)
                
                # Update metrics
                self._update_metrics(alert)
                
                # Trigger automated response if configured
                if alert['severity'] == 'critical':
                    await self._trigger_automated_response(alert)
    
    async def _enrich_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
        """Enrich event with additional context"""
        enriched = event.copy()
        
        # Add user context
        if 'user_id' in event:
            user_context = await self._get_user_context(event['user_id'])
            enriched['user_context'] = user_context
        
        # Add asset context
        if 'asset_id' in event:
            asset_context = await self._get_asset_context(event['asset_id'])
            enriched['asset_context'] = asset_context
        
        # Add historical context
        enriched['historical_stats'] = await self._get_historical_stats(event)
        
        # Add threat intelligence
        enriched['threat_intel'] = await self._check_threat_intelligence(event)
        
        return enriched
    
    async def _detect_incidents(self, event: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Detect security incidents using multiple techniques"""
        detections = []
        
        # Rule-based detection
        rule_detections = self._apply_detection_rules(event)
        detections.extend(rule_detections)
        
        # Anomaly detection
        if self._is_anomalous(event):
            detections.append({
                'type': 'anomaly',
                'confidence': self._calculate_anomaly_score(event),
                'description': 'Unusual behavior detected'
            })
        
        # Behavioral analysis
        behavioral_detections = await self._analyze_behavior(event)
        detections.extend(behavioral_detections)
        
        # Correlation analysis
        correlation_detections = await self._correlate_events(event)
        detections.extend(correlation_detections)
        
        return detections
    
    def _apply_detection_rules(self, event: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Apply rule-based detection"""
        detections = []
        
        # Authentication anomalies
        if event.get('event_type') == 'authentication':
            # Multiple failed attempts
            failed_attempts = self._get_failed_attempts(
                event['user_id'], 
                timedelta(minutes=5)
            )
            if failed_attempts >= 5:
                detections.append({
                    'type': 'brute_force',
                    'confidence': 0.9,
                    'description': f'Multiple failed login attempts: {failed_attempts}'
                })
            
            # Impossible travel
            if self._check_impossible_travel(event):
                detections.append({
                    'type': 'impossible_travel',
                    'confidence': 0.95,
                    'description': 'Login from geographically impossible location'
                })
        
        # Privilege escalation
        if event.get('event_type') == 'authorization':
            if self._detect_privilege_escalation(event):
                detections.append({
                    'type': 'privilege_escalation',
                    'confidence': 0.85,
                    'description': 'Unusual privilege elevation detected'
                })
        
        # Data exfiltration
        if event.get('event_type') == 'data_access':
            if self._detect_data_exfiltration(event):
                detections.append({
                    'type': 'data_exfiltration',
                    'confidence': 0.8,
                    'description': 'Potential data exfiltration pattern'
                })
        
        return detections
    
    async def _analyze_behavior(self, event: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Analyze user and entity behavior"""
        detections = []
        
        # Get baseline behavior
        baseline = await self._get_behavior_baseline(
            event.get('user_id'),
            event.get('asset_id')
        )
        
        # Compare current behavior to baseline
        if baseline:
            deviation_score = self._calculate_deviation(event, baseline)
            
            if deviation_score > self.config['behavior_threshold']:
                detections.append({
                    'type': 'behavioral_anomaly',
                    'confidence': min(deviation_score / 100, 1.0),
                    'description': 'Significant deviation from normal behavior',
                    'details': {
                        'deviation_score': deviation_score,
                        'baseline_period': baseline['period'],
                        'anomalous_features': self._identify_anomalous_features(
                            event, baseline
                        )
                    }
                })
        
        return detections
    
    def _create_alert(self, event: Dict[str, Any], 
                     detections: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Create security alert from detections"""
        # Calculate overall severity
        max_confidence = max(d['confidence'] for d in detections)
        severity = self._calculate_severity(detections, event)
        
        alert = {
            'alert_id': self._generate_alert_id(),
            'timestamp': datetime.utcnow().isoformat(),
            'severity': severity,
            'confidence': max_confidence,
            'event': event,
            'detections': detections,
            'recommended_actions': self._get_recommended_actions(detections),
            'context': {
                'affected_assets': self._identify_affected_assets(event),
                'related_alerts': self._find_related_alerts(event),
                'attack_chain_position': self._analyze_attack_chain(event)
            }
        }
        
        return alert
    
    async def _trigger_automated_response(self, alert: Dict[str, Any]):
        """Trigger automated incident response"""
        response_actions = []
        
        for detection in alert['detections']:
            if detection['type'] == 'brute_force':
                # Block source IP
                action = await self._block_ip_address(
                    alert['event'].get('source_ip')
                )
                response_actions.append(action)
                
                # Disable account temporarily
                action = await self._disable_account(
                    alert['event'].get('user_id'),
                    duration=timedelta(minutes=30)
                )
                response_actions.append(action)
            
            elif detection['type'] == 'data_exfiltration':
                # Revoke access tokens
                action = await self._revoke_tokens(
                    alert['event'].get('user_id')
                )
                response_actions.append(action)
                
                # Initiate forensic capture
                action = await self._capture_forensics(
                    alert['event'].get('asset_id')
                )
                response_actions.append(action)
        
        # Log response actions
        await self._log_response_actions(alert['alert_id'], response_actions)
        
        # Notify security team
        await self._notify_security_team(alert, response_actions)

# SOAR integration for automated response
class SecurityOrchestrationAutomation:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.playbooks = self._load_playbooks()
        self.connectors = self._initialize_connectors()
        
    async def handle_security_alert(self, alert: Dict[str, Any]):
        """Orchestrate response to security alert"""
        # Determine applicable playbooks
        applicable_playbooks = self._match_playbooks(alert)
        
        for playbook in applicable_playbooks:
            # Execute playbook
            execution_result = await self._execute_playbook(
                playbook, 
                alert
            )
            
            # Track execution
            await self._track_execution(
                alert['alert_id'],
                playbook['name'],
                execution_result
            )
    
    async def _execute_playbook(self, playbook: Dict[str, Any], 
                               alert: Dict[str, Any]) -> Dict[str, Any]:
        """Execute security response playbook"""
        execution_context = {
            'alert': alert,
            'start_time': datetime.utcnow(),
            'actions_executed': [],
            'status': 'running'
        }
        
        try:
            for step in playbook['steps']:
                # Check conditions
                if not self._evaluate_conditions(step.get('conditions', []), 
                                               execution_context):
                    continue
                
                # Execute action
                action_result = await self._execute_action(
                    step['action'],
                    execution_context
                )
                
                execution_context['actions_executed'].append({
                    'action': step['action']['type'],
                    'result': action_result,
                    'timestamp': datetime.utcnow().isoformat()
                })
                
                # Check if we should continue
                if action_result.get('stop_execution'):
                    break
            
            execution_context['status'] = 'completed'
            
        except Exception as e:
            execution_context['status'] = 'failed'
            execution_context['error'] = str(e)
            logging.error(f"Playbook execution failed: {e}")
        
        execution_context['end_time'] = datetime.utcnow()
        return execution_context