Behavioral Analysis for DDoS Detection

Behavioral Analysis for DDoS Detection

Modern DDoS attacks often mimic legitimate traffic patterns, making traditional rate limiting insufficient. Behavioral analysis identifies attacks by examining request patterns, timing, and content rather than just volume. Machine learning models trained on normal API usage can detect subtle anomalies indicating attacks.

# Python implementation of behavioral DDoS detection
import numpy as np
from sklearn.ensemble import IsolationForest
from collections import defaultdict
import time

class BehavioralDDoSDetector:
    def __init__(self, window_size=300):  # 5-minute windows
        self.window_size = window_size
        self.client_histories = defaultdict(lambda: {
            'requests': [],
            'endpoints': defaultdict(int),
            'user_agents': set(),
            'response_times': []
        })
        self.model = IsolationForest(contamination=0.1)
        self.is_trained = False
        
    def extract_features(self, client_history):
        """Extract behavioral features from client history"""
        if not client_history['requests']:
            return None
            
        requests = client_history['requests']
        recent_requests = [r for r in requests 
                          if r['timestamp'] > time.time() - self.window_size]
        
        if len(recent_requests) < 5:
            return None
        
        # Calculate features
        request_intervals = []
        for i in range(1, len(recent_requests)):
            interval = recent_requests[i]['timestamp'] - recent_requests[i-1]['timestamp']
            request_intervals.append(interval)
        
        features = {
            'request_rate': len(recent_requests) / self.window_size,
            'interval_mean': np.mean(request_intervals) if request_intervals else 0,
            'interval_std': np.std(request_intervals) if request_intervals else 0,
            'unique_endpoints': len(client_history['endpoints']),
            'endpoint_entropy': self.calculate_entropy(client_history['endpoints']),
            'user_agent_changes': len(client_history['user_agents']),
            'avg_response_time': np.mean(client_history['response_times'][-100:]) 
                                if client_history['response_times'] else 0,
            'error_rate': sum(1 for r in recent_requests if r['status'] >= 400) / len(recent_requests)
        }
        
        return list(features.values())
    
    def calculate_entropy(self, endpoint_counts):
        """Calculate Shannon entropy of endpoint distribution"""
        total = sum(endpoint_counts.values())
        if total == 0:
            return 0
        
        entropy = 0
        for count in endpoint_counts.values():
            if count > 0:
                p = count / total
                entropy -= p * np.log2(p)
        
        return entropy
    
    def analyze_request(self, client_id, request_data):
        """Analyze incoming request for DDoS patterns"""
        history = self.client_histories[client_id]
        
        # Update history
        history['requests'].append({
            'timestamp': time.time(),
            'endpoint': request_data['endpoint'],
            'status': request_data['status']
        })
        history['endpoints'][request_data['endpoint']] += 1
        history['user_agents'].add(request_data['user_agent'])
        history['response_times'].append(request_data['response_time'])
        
        # Clean old data
        cutoff_time = time.time() - self.window_size * 2
        history['requests'] = [r for r in history['requests'] 
                              if r['timestamp'] > cutoff_time]
        
        # Extract features and predict
        features = self.extract_features(history)
        if features and self.is_trained:
            prediction = self.model.predict([features])[0]
            anomaly_score = self.model.score_samples([features])[0]
            
            return {
                'is_anomaly': prediction == -1,
                'anomaly_score': anomaly_score,
                'confidence': abs(anomaly_score)
            }
        
        return {'is_anomaly': False, 'anomaly_score': 0, 'confidence': 0}

Effective API rate limiting and DDoS protection require multiple layers of defense working in concert. The next chapter explores how proper input validation prevents many attacks from succeeding even if they bypass rate limiting controls.## Input Validation and SQL Injection Prevention

Input validation forms the cornerstone of API security, protecting against a wide range of attacks including SQL injection, command injection, and cross-site scripting. Every piece of data entering your API represents a potential security threat if not properly validated and sanitized. This chapter provides comprehensive guidance on implementing robust input validation strategies and preventing injection attacks that could compromise your entire system.