Building a Comprehensive Security Monitoring Architecture

Building a Comprehensive Security Monitoring Architecture

Effective security monitoring requires visibility across all layers of the technology stack. Network traffic, application logs, database queries, user behavior, and system metrics all provide crucial signals for detecting potential breaches. However, the sheer volume of data generated by modern systems can overwhelm traditional monitoring approaches. Successful architectures employ intelligent filtering, correlation, and prioritization to transform raw data into actionable intelligence.

The foundation of security monitoring lies in comprehensive log collection and centralization. Every system component should generate detailed logs capturing security-relevant events. These logs must flow into centralized systems that can correlate events across different sources. However, logging itself can create security risks if sensitive data appears in logs or if log systems become attractive targets for attackers seeking to cover their tracks.

# Example: Comprehensive security monitoring system
import asyncio
import json
import re
from datetime import datetime, timedelta
from collections import defaultdict
import numpy as np
from sklearn.ensemble import IsolationForest
import aioredis
from elasticsearch import AsyncElasticsearch
from prometheus_client import Counter, Histogram, Gauge

class SecurityMonitoringSystem:
    def __init__(self, config):
        self.config = config
        self.es_client = AsyncElasticsearch(config['elasticsearch'])
        self.redis_client = None
        self.ml_models = {}
        self.alert_manager = AlertManager(config['alerting'])
        self.metrics = self._initialize_metrics()
        
    def _initialize_metrics(self):
        """Initialize Prometheus metrics for monitoring the monitoring system"""
        return {
            'events_processed': Counter(
                'security_events_processed_total',
                'Total security events processed',
                ['event_type', 'severity']
            ),
            'anomalies_detected': Counter(
                'security_anomalies_detected_total',
                'Total anomalies detected',
                ['anomaly_type']
            ),
            'processing_time': Histogram(
                'security_event_processing_seconds',
                'Time to process security events'
            ),
            'active_threats': Gauge(
                'active_security_threats',
                'Currently active security threats'
            )
        }
    
    async def initialize(self):
        """Initialize monitoring system components"""
        self.redis_client = await aioredis.create_redis_pool(
            self.config['redis_url'],
            encoding='utf-8'
        )
        
        # Load ML models for anomaly detection
        await self._load_ml_models()
        
        # Start background tasks
        asyncio.create_task(self._process_event_stream())
        asyncio.create_task(self._run_correlation_engine())
        asyncio.create_task(self._update_threat_intelligence())
        
    async def ingest_event(self, event):
        """Ingest and process security event"""
        # Sanitize sensitive data from logs
        sanitized_event = self._sanitize_event(event)
        
        # Enrich with context
        enriched_event = await self._enrich_event(sanitized_event)
        
        # Check against threat intelligence
        threat_match = await self._check_threat_intelligence(enriched_event)
        if threat_match:
            enriched_event['threat_indicators'] = threat_match
            enriched_event['severity'] = max(
                enriched_event.get('severity', 'low'),
                'high'
            )
        
        # Store in Elasticsearch
        await self.es_client.index(
            index=f"security-events-{datetime.utcnow().strftime('%Y.%m.%d')}",
            body=enriched_event
        )
        
        # Real-time analysis
        await self._analyze_event(enriched_event)
        
        # Update metrics
        self.metrics['events_processed'].labels(
            event_type=enriched_event.get('event_type', 'unknown'),
            severity=enriched_event.get('severity', 'info')
        ).inc()
    
    def _sanitize_event(self, event):
        """Remove sensitive data from events"""
        sanitized = event.copy()
        
        # Patterns for sensitive data
        patterns = {
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
            'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'ip_address': r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',
            'password': r'password["\s:=]+["\']?([^"\'\s]+)["\']?'
        }
        
        # Recursively sanitize
        def sanitize_value(value):
            if isinstance(value, str):
                for pattern_name, pattern in patterns.items():
                    if pattern_name == 'ip_address':
                        # Anonymize IPs instead of removing
                        value = re.sub(pattern, lambda m: self._anonymize_ip(m.group()), value)
                    else:
                        value = re.sub(pattern, f'[REDACTED_{pattern_name.upper()}]', value)
                return value
            elif isinstance(value, dict):
                return {k: sanitize_value(v) for k, v in value.items()}
            elif isinstance(value, list):
                return [sanitize_value(item) for item in value]
            return value
        
        return sanitize_value(sanitized)
    
    async def _analyze_event(self, event):
        """Analyze event for security threats"""
        analysis_results = []
        
        # Rule-based detection
        rule_matches = await self._run_detection_rules(event)
        if rule_matches:
            analysis_results.extend(rule_matches)
        
        # Behavioral analysis
        behavioral_anomalies = await self._behavioral_analysis(event)
        if behavioral_anomalies:
            analysis_results.extend(behavioral_anomalies)
        
        # ML-based anomaly detection
        ml_anomalies = await self._ml_anomaly_detection(event)
        if ml_anomalies:
            analysis_results.extend(ml_anomalies)
        
        # Process detection results
        for result in analysis_results:
            await self._handle_detection(event, result)
    
    async def _run_detection_rules(self, event):
        """Apply detection rules to event"""
        matches = []
        
        # Failed login attempts
        if event.get('event_type') == 'authentication_failure':
            key = f"failed_login:{event.get('user_id', 'unknown')}"
            count = await self.redis_client.incr(key)
            await self.redis_client.expire(key, 300)  # 5 minute window
            
            if count >= 5:
                matches.append({
                    'rule': 'excessive_failed_logins',
                    'severity': 'medium',
                    'confidence': 0.9,
                    'description': f"User {event.get('user_id')} has {count} failed login attempts"
                })
        
        # Privilege escalation
        if event.get('event_type') == 'permission_change':
            if 'admin' in event.get('new_permissions', []):
                matches.append({
                    'rule': 'privilege_escalation',
                    'severity': 'high',
                    'confidence': 0.8,
                    'description': 'User gained administrative privileges'
                })
        
        # Data exfiltration patterns
        if event.get('event_type') == 'data_access':
            volume = event.get('data_volume_bytes', 0)
            if volume > 100 * 1024 * 1024:  # 100MB
                matches.append({
                    'rule': 'large_data_access',
                    'severity': 'medium',
                    'confidence': 0.7,
                    'description': f"Large data access: {volume / 1024 / 1024:.2f}MB"
                })
        
        # SQL injection patterns
        if event.get('event_type') == 'database_query':
            query = event.get('query', '').lower()
            injection_patterns = [
                'union select',
                'or 1=1',
                'drop table',
                '; exec',
                'xp_cmdshell'
            ]
            
            for pattern in injection_patterns:
                if pattern in query:
                    matches.append({
                        'rule': 'sql_injection_attempt',
                        'severity': 'critical',
                        'confidence': 0.95,
                        'description': f"SQL injection pattern detected: {pattern}"
                    })
                    break
        
        return matches
    
    async def _behavioral_analysis(self, event):
        """Analyze user and entity behavior"""
        anomalies = []
        
        if event.get('user_id'):
            user_id = event['user_id']
            
            # Get user baseline
            baseline = await self._get_user_baseline(user_id)
            
            # Check access time
            current_hour = datetime.utcnow().hour
            if baseline and current_hour not in baseline['typical_hours']:
                anomalies.append({
                    'type': 'unusual_access_time',
                    'severity': 'low',
                    'confidence': 0.6,
                    'description': f"User {user_id} accessing system at unusual time"
                })
            
            # Check access location
            if event.get('ip_address'):
                location = await self._get_geo_location(event['ip_address'])
                if baseline and location not in baseline['typical_locations']:
                    anomalies.append({
                        'type': 'unusual_location',
                        'severity': 'medium',
                        'confidence': 0.7,
                        'description': f"User {user_id} accessing from unusual location: {location}"
                    })
            
            # Check access patterns
            if event.get('resource_accessed'):
                if baseline and event['resource_accessed'] not in baseline['accessed_resources']:
                    anomalies.append({
                        'type': 'unusual_resource_access',
                        'severity': 'medium',
                        'confidence': 0.65,
                        'description': f"User {user_id} accessing unusual resource"
                    })
        
        return anomalies
    
    async def _ml_anomaly_detection(self, event):
        """Use machine learning for anomaly detection"""
        anomalies = []
        
        # Prepare feature vector
        features = self._extract_features(event)
        
        if features is not None:
            # Use Isolation Forest for anomaly detection
            if 'general' in self.ml_models:
                model = self.ml_models['general']
                prediction = model.predict([features])
                
                if prediction[0] == -1:  # Anomaly
                    score = model.score_samples([features])[0]
                    anomalies.append({
                        'type': 'ml_anomaly',
                        'severity': self._score_to_severity(score),
                        'confidence': abs(score),
                        'description': 'Machine learning model detected anomalous behavior'
                    })
            
            # Specific models for different event types
            event_type = event.get('event_type')
            if event_type in self.ml_models:
                model = self.ml_models[event_type]
                prediction = model.predict([features])
                
                if prediction[0] == -1:
                    score = model.score_samples([features])[0]
                    anomalies.append({
                        'type': f'ml_anomaly_{event_type}',
                        'severity': self._score_to_severity(score),
                        'confidence': abs(score),
                        'description': f'Anomalous {event_type} detected'
                    })
        
        return anomalies
    
    def _extract_features(self, event):
        """Extract numerical features from event for ML"""
        try:
            features = []
            
            # Time-based features
            timestamp = datetime.fromisoformat(event.get('timestamp', datetime.utcnow().isoformat()))
            features.extend([
                timestamp.hour,
                timestamp.weekday(),
                timestamp.day,
                timestamp.month
            ])
            
            # Event-specific features
            features.append(hash(event.get('event_type', '')) % 1000)
            features.append(hash(event.get('user_id', '')) % 10000)
            features.append(event.get('data_volume_bytes', 0) / 1024)  # KB
            features.append(len(event.get('accessed_resources', [])))
            
            # Network features
            if event.get('ip_address'):
                # Convert IP to numeric features
                ip_parts = event['ip_address'].split('.')
                if len(ip_parts) == 4:
                    features.extend([int(part) for part in ip_parts])
                else:
                    features.extend([0, 0, 0, 0])
            else:
                features.extend([0, 0, 0, 0])
            
            return features
        except Exception as e:
            self.logger.error(f"Feature extraction failed: {e}")
            return None
    
    async def _run_correlation_engine(self):
        """Correlate events to detect complex attack patterns"""
        while True:
            try:
                # Get recent events
                recent_events = await self._get_recent_events(minutes=5)
                
                # Group by various keys
                by_user = defaultdict(list)
                by_ip = defaultdict(list)
                by_session = defaultdict(list)
                
                for event in recent_events:
                    if event.get('user_id'):
                        by_user[event['user_id']].append(event)
                    if event.get('ip_address'):
                        by_ip[event['ip_address']].append(event)
                    if event.get('session_id'):
                        by_session[event['session_id']].append(event)
                
                # Look for attack patterns
                await self._detect_brute_force(by_user, by_ip)
                await self._detect_lateral_movement(by_user)
                await self._detect_data_exfiltration(by_user, by_session)
                await self._detect_privilege_abuse(by_user)
                
                await asyncio.sleep(30)  # Run every 30 seconds
                
            except Exception as e:
                self.logger.error(f"Correlation engine error: {e}")
                await asyncio.sleep(60)

class ThreatIntelligenceIntegration:
    """Integrate external threat intelligence feeds"""
    
    def __init__(self, config):
        self.config = config
        self.threat_feeds = {}
        self.ioc_database = IOCDatabase()
        
    async def update_threat_feeds(self):
        """Update threat intelligence from various sources"""
        
        # Update from commercial feeds
        for feed_name, feed_config in self.config['feeds'].items():
            try:
                if feed_config['type'] == 'stix':
                    await self._update_stix_feed(feed_name, feed_config)
                elif feed_config['type'] == 'csv':
                    await self._update_csv_feed(feed_name, feed_config)
                elif feed_config['type'] == 'api':
                    await self._update_api_feed(feed_name, feed_config)
                
            except Exception as e:
                self.logger.error(f"Failed to update {feed_name}: {e}")
        
        # Update internal threat intelligence
        await self._update_internal_intelligence()
        
        # Clean expired indicators
        await self.ioc_database.clean_expired()
    
    async def check_ioc(self, indicator_value, indicator_type):
        """Check if indicator matches known threats"""
        
        matches = await self.ioc_database.search(
            value=indicator_value,
            type=indicator_type
        )
        
        if matches:
            # Aggregate threat info from multiple sources
            threat_info = {
                'severity': max(m['severity'] for m in matches),
                'confidence': max(m['confidence'] for m in matches),
                'sources': list(set(m['source'] for m in matches)),
                'first_seen': min(m['first_seen'] for m in matches),
                'last_seen': max(m['last_seen'] for m in matches),
                'tags': list(set(tag for m in matches for tag in m.get('tags', [])))
            }
            
            return threat_info
        
        return None