Security Analytics and Threat Intelligence

Security Analytics and Threat Intelligence

Advanced analytics transform raw security data into actionable intelligence. By applying statistical analysis, machine learning, and threat intelligence feeds, organizations can identify sophisticated attacks and predict future threats.

# Python security analytics platform
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import networkx as nx
from typing import List, Dict, Tuple
import asyncio
from datetime import datetime, timedelta

class SecurityAnalytics:
    def __init__(self, data_store, threat_intel_client):
        self.data_store = data_store
        self.threat_intel = threat_intel_client
        self.models = {}
        self.initialize_models()
        
    def initialize_models(self):
        """Initialize ML models for security analytics"""
        # Anomaly detection model
        self.models['anomaly_detector'] = IsolationForest(
            contamination=0.01,
            random_state=42
        )
        
        # Feature scaler
        self.scaler = StandardScaler()
        
    async def analyze_user_behavior(self, user_id: str, 
                                  time_window: timedelta = timedelta(days=30)):
        """Analyze user behavior for anomalies"""
        # Fetch user activity data
        end_time = datetime.utcnow()
        start_time = end_time - time_window
        
        activities = await self.data_store.get_user_activities(
            user_id, start_time, end_time
        )
        
        if len(activities) < 10:
            return None  # Insufficient data
            
        # Extract features
        features = self.extract_user_features(activities)
        
        # Detect anomalies
        anomalies = self.detect_anomalies(features)
        
        # Calculate risk score
        risk_score = self.calculate_user_risk_score(
            user_id, features, anomalies
        )
        
        return {
            'user_id': user_id,
            'risk_score': risk_score,
            'anomalies': anomalies,
            'behavior_profile': self.create_behavior_profile(features),
            'recommendations': self.generate_recommendations(risk_score, anomalies)
        }
    
    def extract_user_features(self, activities: List[Dict]) -> pd.DataFrame:
        """Extract behavioral features from user activities"""
        features = []
        
        for activity in activities:
            feature_vector = {
                'hour_of_day': activity['timestamp'].hour,
                'day_of_week': activity['timestamp'].weekday(),
                'endpoint_entropy': self.calculate_endpoint_entropy(activity),
                'request_rate': self.calculate_request_rate(activity),
                'error_rate': self.calculate_error_rate(activity),
                'data_volume': activity.get('response_size', 0),
                'unique_ips': len(set(a.get('ip_address') for a in activities)),
                'auth_failures': sum(1 for a in activities 
                                   if a.get('event_type') == 'auth_failure'),
                'privilege_requests': sum(1 for a in activities 
                                        if 'admin' in a.get('endpoint', ''))
            }
            
            features.append(feature_vector)
            
        return pd.DataFrame(features)
    
    def detect_attack_patterns(self, 
                             time_window: timedelta = timedelta(hours=1)):
        """Detect coordinated attack patterns"""
        end_time = datetime.utcnow()
        start_time = end_time - time_window
        
        # Build activity graph
        G = nx.DiGraph()
        
        activities = self.data_store.get_activities(start_time, end_time)
        
        for activity in activities:
            # Add nodes
            G.add_node(activity['ip_address'], type='ip')
            G.add_node(activity['user_id'], type='user')
            G.add_node(activity['endpoint'], type='endpoint')
            
            # Add edges
            G.add_edge(activity['ip_address'], activity['user_id'], 
                      weight=1, timestamp=activity['timestamp'])
            G.add_edge(activity['user_id'], activity['endpoint'], 
                      weight=1, timestamp=activity['timestamp'])
        
        # Detect suspicious patterns
        patterns = []
        
        # Pattern 1: Multiple users from same IP
        for ip in [n for n in G.nodes() if G.nodes[n]['type'] == 'ip']:
            users = list(G.successors(ip))
            if len(users) > 5:
                patterns.append({
                    'type': 'multiple_users_same_ip',
                    'ip': ip,
                    'users': users,
                    'severity': 'HIGH'
                })
        
        # Pattern 2: Rapid endpoint scanning
        for user in [n for n in G.nodes() if G.nodes[n]['type'] == 'user']:
            endpoints = list(G.successors(user))
            if len(endpoints) > 20:
                patterns.append({
                    'type': 'endpoint_scanning',
                    'user': user,
                    'endpoints_accessed': len(endpoints),
                    'severity': 'MEDIUM'
                })
        
        # Pattern 3: Coordinated activity
        communities = nx.community.louvain_communities(G.to_undirected())
        for community in communities:
            if len(community) > 10:
                patterns.append({
                    'type': 'coordinated_activity',
                    'entities': list(community),
                    'size': len(community),
                    'severity': 'HIGH'
                })
        
        return patterns
    
    async def correlate_with_threat_intel(self, indicators: List[str]):
        """Correlate observed indicators with threat intelligence"""
        threats = []
        
        for indicator in indicators:
            # Check IP reputation
            if self.is_ip_address(indicator):
                reputation = await self.threat_intel.check_ip_reputation(indicator)
                if reputation['malicious']:
                    threats.append({
                        'indicator': indicator,
                        'type': 'ip',
                        'threat_level': reputation['threat_level'],
                        'categories': reputation['categories'],
                        'first_seen': reputation['first_seen'],
                        'last_seen': reputation['last_seen']
                    })
            
            # Check domain reputation
            elif self.is_domain(indicator):
                reputation = await self.threat_intel.check_domain_reputation(indicator)
                if reputation['malicious']:
                    threats.append({
                        'indicator': indicator,
                        'type': 'domain',
                        'threat_level': reputation['threat_level'],
                        'malware_families': reputation.get('malware_families', [])
                    })
        
        return threats
    
    def generate_security_report(self, time_period: str = 'daily'):
        """Generate comprehensive security report"""
        report = {
            'period': time_period,
            'generated_at': datetime.utcnow().isoformat(),
            'summary': {},
            'top_threats': [],
            'trends': {},
            'recommendations': []
        }
        
        # Calculate summary statistics
        report['summary'] = {
            'total_requests': self.data_store.count_events('api_request'),
            'failed_authentications': self.data_store.count_events(
                'authentication', 
                filter={'success': False}
            ),
            'authorization_denials': self.data_store.count_events(
                'authorization',
                filter={'granted': False}
            ),
            'unique_attackers': len(self.get_unique_attackers()),
            'blocked_ips': len(self.get_blocked_ips()),
        }
        
        # Identify top threats
        report['top_threats'] = self.identify_top_threats()
        
        # Analyze trends
        report['trends'] = self.analyze_security_trends()
        
        # Generate recommendations
        report['recommendations'] = self.generate_recommendations_from_data()
        
        return report

# Automated response system
class AutomatedSecurityResponse:
    def __init__(self, security_analytics, response_config):
        self.analytics = security_analytics
        self.config = response_config
        
    async def evaluate_and_respond(self, event: Dict):
        """Evaluate security event and execute appropriate response"""
        # Calculate threat level
        threat_level = await self.calculate_threat_level(event)
        
        if threat_level >= self.config['auto_response_threshold']:
            response_actions = self.determine_response_actions(event, threat_level)
            
            for action in response_actions:
                await self.execute_response_action(action, event)
    
    async def execute_response_action(self, action: str, event: Dict):
        """Execute specific security response action"""
        if action == 'block_ip':
            duration = self.calculate_block_duration(event)
            await self.security_gateway.block_ip(event['ip_address'], duration)
            
        elif action == 'enable_captcha':
            await self.security_gateway.enable_captcha_for_ip(event['ip_address'])
            
        elif action == 'suspend_user':
            await self.user_service.suspend_user(
                event['user_id'], 
                reason='Automated security response'
            )
            
        elif action == 'increase_logging':
            await self.logging_service.increase_verbosity(
                user_id=event.get('user_id'),
                ip_address=event.get('ip_address')
            )
            
        elif action == 'notify_security':
            await self.notification_service.alert_security_team(event)

Effective monitoring and logging provide the visibility needed to maintain API security. The next chapter explores common API vulnerabilities and their remediation strategies.## Common API Security Vulnerabilities and Fixes

Understanding common API vulnerabilities is essential for building secure APIs. The OWASP API Security Top 10 provides a foundation, but real-world APIs face numerous additional threats. This chapter examines the most critical API vulnerabilities, demonstrates how attackers exploit them, and provides comprehensive remediation strategies with practical implementation examples.