Building a Comprehensive Security Monitoring Architecture
Building a Comprehensive Security Monitoring Architecture
Effective security monitoring requires visibility across all layers of the technology stack. Network traffic, application logs, database queries, user behavior, and system metrics all provide crucial signals for detecting potential breaches. However, the sheer volume of data generated by modern systems can overwhelm traditional monitoring approaches. Successful architectures employ intelligent filtering, correlation, and prioritization to transform raw data into actionable intelligence.
The foundation of security monitoring lies in comprehensive log collection and centralization. Every system component should generate detailed logs capturing security-relevant events. These logs must flow into centralized systems that can correlate events across different sources. However, logging itself can create security risks if sensitive data appears in logs or if log systems become attractive targets for attackers seeking to cover their tracks.
# Example: Comprehensive security monitoring system
import asyncio
import json
import re
from datetime import datetime, timedelta
from collections import defaultdict
import numpy as np
from sklearn.ensemble import IsolationForest
import aioredis
from elasticsearch import AsyncElasticsearch
from prometheus_client import Counter, Histogram, Gauge
class SecurityMonitoringSystem:
def __init__(self, config):
self.config = config
self.es_client = AsyncElasticsearch(config['elasticsearch'])
self.redis_client = None
self.ml_models = {}
self.alert_manager = AlertManager(config['alerting'])
self.metrics = self._initialize_metrics()
def _initialize_metrics(self):
"""Initialize Prometheus metrics for monitoring the monitoring system"""
return {
'events_processed': Counter(
'security_events_processed_total',
'Total security events processed',
['event_type', 'severity']
),
'anomalies_detected': Counter(
'security_anomalies_detected_total',
'Total anomalies detected',
['anomaly_type']
),
'processing_time': Histogram(
'security_event_processing_seconds',
'Time to process security events'
),
'active_threats': Gauge(
'active_security_threats',
'Currently active security threats'
)
}
async def initialize(self):
"""Initialize monitoring system components"""
self.redis_client = await aioredis.create_redis_pool(
self.config['redis_url'],
encoding='utf-8'
)
# Load ML models for anomaly detection
await self._load_ml_models()
# Start background tasks
asyncio.create_task(self._process_event_stream())
asyncio.create_task(self._run_correlation_engine())
asyncio.create_task(self._update_threat_intelligence())
async def ingest_event(self, event):
"""Ingest and process security event"""
# Sanitize sensitive data from logs
sanitized_event = self._sanitize_event(event)
# Enrich with context
enriched_event = await self._enrich_event(sanitized_event)
# Check against threat intelligence
threat_match = await self._check_threat_intelligence(enriched_event)
if threat_match:
enriched_event['threat_indicators'] = threat_match
enriched_event['severity'] = max(
enriched_event.get('severity', 'low'),
'high'
)
# Store in Elasticsearch
await self.es_client.index(
index=f"security-events-{datetime.utcnow().strftime('%Y.%m.%d')}",
body=enriched_event
)
# Real-time analysis
await self._analyze_event(enriched_event)
# Update metrics
self.metrics['events_processed'].labels(
event_type=enriched_event.get('event_type', 'unknown'),
severity=enriched_event.get('severity', 'info')
).inc()
def _sanitize_event(self, event):
"""Remove sensitive data from events"""
sanitized = event.copy()
# Patterns for sensitive data
patterns = {
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'ip_address': r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',
'password': r'password["\s:=]+["\']?([^"\'\s]+)["\']?'
}
# Recursively sanitize
def sanitize_value(value):
if isinstance(value, str):
for pattern_name, pattern in patterns.items():
if pattern_name == 'ip_address':
# Anonymize IPs instead of removing
value = re.sub(pattern, lambda m: self._anonymize_ip(m.group()), value)
else:
value = re.sub(pattern, f'[REDACTED_{pattern_name.upper()}]', value)
return value
elif isinstance(value, dict):
return {k: sanitize_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [sanitize_value(item) for item in value]
return value
return sanitize_value(sanitized)
async def _analyze_event(self, event):
"""Analyze event for security threats"""
analysis_results = []
# Rule-based detection
rule_matches = await self._run_detection_rules(event)
if rule_matches:
analysis_results.extend(rule_matches)
# Behavioral analysis
behavioral_anomalies = await self._behavioral_analysis(event)
if behavioral_anomalies:
analysis_results.extend(behavioral_anomalies)
# ML-based anomaly detection
ml_anomalies = await self._ml_anomaly_detection(event)
if ml_anomalies:
analysis_results.extend(ml_anomalies)
# Process detection results
for result in analysis_results:
await self._handle_detection(event, result)
async def _run_detection_rules(self, event):
"""Apply detection rules to event"""
matches = []
# Failed login attempts
if event.get('event_type') == 'authentication_failure':
key = f"failed_login:{event.get('user_id', 'unknown')}"
count = await self.redis_client.incr(key)
await self.redis_client.expire(key, 300) # 5 minute window
if count >= 5:
matches.append({
'rule': 'excessive_failed_logins',
'severity': 'medium',
'confidence': 0.9,
'description': f"User {event.get('user_id')} has {count} failed login attempts"
})
# Privilege escalation
if event.get('event_type') == 'permission_change':
if 'admin' in event.get('new_permissions', []):
matches.append({
'rule': 'privilege_escalation',
'severity': 'high',
'confidence': 0.8,
'description': 'User gained administrative privileges'
})
# Data exfiltration patterns
if event.get('event_type') == 'data_access':
volume = event.get('data_volume_bytes', 0)
if volume > 100 * 1024 * 1024: # 100MB
matches.append({
'rule': 'large_data_access',
'severity': 'medium',
'confidence': 0.7,
'description': f"Large data access: {volume / 1024 / 1024:.2f}MB"
})
# SQL injection patterns
if event.get('event_type') == 'database_query':
query = event.get('query', '').lower()
injection_patterns = [
'union select',
'or 1=1',
'drop table',
'; exec',
'xp_cmdshell'
]
for pattern in injection_patterns:
if pattern in query:
matches.append({
'rule': 'sql_injection_attempt',
'severity': 'critical',
'confidence': 0.95,
'description': f"SQL injection pattern detected: {pattern}"
})
break
return matches
async def _behavioral_analysis(self, event):
"""Analyze user and entity behavior"""
anomalies = []
if event.get('user_id'):
user_id = event['user_id']
# Get user baseline
baseline = await self._get_user_baseline(user_id)
# Check access time
current_hour = datetime.utcnow().hour
if baseline and current_hour not in baseline['typical_hours']:
anomalies.append({
'type': 'unusual_access_time',
'severity': 'low',
'confidence': 0.6,
'description': f"User {user_id} accessing system at unusual time"
})
# Check access location
if event.get('ip_address'):
location = await self._get_geo_location(event['ip_address'])
if baseline and location not in baseline['typical_locations']:
anomalies.append({
'type': 'unusual_location',
'severity': 'medium',
'confidence': 0.7,
'description': f"User {user_id} accessing from unusual location: {location}"
})
# Check access patterns
if event.get('resource_accessed'):
if baseline and event['resource_accessed'] not in baseline['accessed_resources']:
anomalies.append({
'type': 'unusual_resource_access',
'severity': 'medium',
'confidence': 0.65,
'description': f"User {user_id} accessing unusual resource"
})
return anomalies
async def _ml_anomaly_detection(self, event):
"""Use machine learning for anomaly detection"""
anomalies = []
# Prepare feature vector
features = self._extract_features(event)
if features is not None:
# Use Isolation Forest for anomaly detection
if 'general' in self.ml_models:
model = self.ml_models['general']
prediction = model.predict([features])
if prediction[0] == -1: # Anomaly
score = model.score_samples([features])[0]
anomalies.append({
'type': 'ml_anomaly',
'severity': self._score_to_severity(score),
'confidence': abs(score),
'description': 'Machine learning model detected anomalous behavior'
})
# Specific models for different event types
event_type = event.get('event_type')
if event_type in self.ml_models:
model = self.ml_models[event_type]
prediction = model.predict([features])
if prediction[0] == -1:
score = model.score_samples([features])[0]
anomalies.append({
'type': f'ml_anomaly_{event_type}',
'severity': self._score_to_severity(score),
'confidence': abs(score),
'description': f'Anomalous {event_type} detected'
})
return anomalies
def _extract_features(self, event):
"""Extract numerical features from event for ML"""
try:
features = []
# Time-based features
timestamp = datetime.fromisoformat(event.get('timestamp', datetime.utcnow().isoformat()))
features.extend([
timestamp.hour,
timestamp.weekday(),
timestamp.day,
timestamp.month
])
# Event-specific features
features.append(hash(event.get('event_type', '')) % 1000)
features.append(hash(event.get('user_id', '')) % 10000)
features.append(event.get('data_volume_bytes', 0) / 1024) # KB
features.append(len(event.get('accessed_resources', [])))
# Network features
if event.get('ip_address'):
# Convert IP to numeric features
ip_parts = event['ip_address'].split('.')
if len(ip_parts) == 4:
features.extend([int(part) for part in ip_parts])
else:
features.extend([0, 0, 0, 0])
else:
features.extend([0, 0, 0, 0])
return features
except Exception as e:
self.logger.error(f"Feature extraction failed: {e}")
return None
async def _run_correlation_engine(self):
"""Correlate events to detect complex attack patterns"""
while True:
try:
# Get recent events
recent_events = await self._get_recent_events(minutes=5)
# Group by various keys
by_user = defaultdict(list)
by_ip = defaultdict(list)
by_session = defaultdict(list)
for event in recent_events:
if event.get('user_id'):
by_user[event['user_id']].append(event)
if event.get('ip_address'):
by_ip[event['ip_address']].append(event)
if event.get('session_id'):
by_session[event['session_id']].append(event)
# Look for attack patterns
await self._detect_brute_force(by_user, by_ip)
await self._detect_lateral_movement(by_user)
await self._detect_data_exfiltration(by_user, by_session)
await self._detect_privilege_abuse(by_user)
await asyncio.sleep(30) # Run every 30 seconds
except Exception as e:
self.logger.error(f"Correlation engine error: {e}")
await asyncio.sleep(60)
class ThreatIntelligenceIntegration:
"""Integrate external threat intelligence feeds"""
def __init__(self, config):
self.config = config
self.threat_feeds = {}
self.ioc_database = IOCDatabase()
async def update_threat_feeds(self):
"""Update threat intelligence from various sources"""
# Update from commercial feeds
for feed_name, feed_config in self.config['feeds'].items():
try:
if feed_config['type'] == 'stix':
await self._update_stix_feed(feed_name, feed_config)
elif feed_config['type'] == 'csv':
await self._update_csv_feed(feed_name, feed_config)
elif feed_config['type'] == 'api':
await self._update_api_feed(feed_name, feed_config)
except Exception as e:
self.logger.error(f"Failed to update {feed_name}: {e}")
# Update internal threat intelligence
await self._update_internal_intelligence()
# Clean expired indicators
await self.ioc_database.clean_expired()
async def check_ioc(self, indicator_value, indicator_type):
"""Check if indicator matches known threats"""
matches = await self.ioc_database.search(
value=indicator_value,
type=indicator_type
)
if matches:
# Aggregate threat info from multiple sources
threat_info = {
'severity': max(m['severity'] for m in matches),
'confidence': max(m['confidence'] for m in matches),
'sources': list(set(m['source'] for m in matches)),
'first_seen': min(m['first_seen'] for m in matches),
'last_seen': max(m['last_seen'] for m in matches),
'tags': list(set(tag for m in matches for tag in m.get('tags', [])))
}
return threat_info
return None