Behavioral Analysis for DDoS Detection
Behavioral Analysis for DDoS Detection
Modern DDoS attacks often mimic legitimate traffic patterns, making traditional rate limiting insufficient. Behavioral analysis identifies attacks by examining request patterns, timing, and content rather than just volume. Machine learning models trained on normal API usage can detect subtle anomalies indicating attacks.
# Python implementation of behavioral DDoS detection
import numpy as np
from sklearn.ensemble import IsolationForest
from collections import defaultdict
import time
class BehavioralDDoSDetector:
def __init__(self, window_size=300): # 5-minute windows
self.window_size = window_size
self.client_histories = defaultdict(lambda: {
'requests': [],
'endpoints': defaultdict(int),
'user_agents': set(),
'response_times': []
})
self.model = IsolationForest(contamination=0.1)
self.is_trained = False
def extract_features(self, client_history):
"""Extract behavioral features from client history"""
if not client_history['requests']:
return None
requests = client_history['requests']
recent_requests = [r for r in requests
if r['timestamp'] > time.time() - self.window_size]
if len(recent_requests) < 5:
return None
# Calculate features
request_intervals = []
for i in range(1, len(recent_requests)):
interval = recent_requests[i]['timestamp'] - recent_requests[i-1]['timestamp']
request_intervals.append(interval)
features = {
'request_rate': len(recent_requests) / self.window_size,
'interval_mean': np.mean(request_intervals) if request_intervals else 0,
'interval_std': np.std(request_intervals) if request_intervals else 0,
'unique_endpoints': len(client_history['endpoints']),
'endpoint_entropy': self.calculate_entropy(client_history['endpoints']),
'user_agent_changes': len(client_history['user_agents']),
'avg_response_time': np.mean(client_history['response_times'][-100:])
if client_history['response_times'] else 0,
'error_rate': sum(1 for r in recent_requests if r['status'] >= 400) / len(recent_requests)
}
return list(features.values())
def calculate_entropy(self, endpoint_counts):
"""Calculate Shannon entropy of endpoint distribution"""
total = sum(endpoint_counts.values())
if total == 0:
return 0
entropy = 0
for count in endpoint_counts.values():
if count > 0:
p = count / total
entropy -= p * np.log2(p)
return entropy
def analyze_request(self, client_id, request_data):
"""Analyze incoming request for DDoS patterns"""
history = self.client_histories[client_id]
# Update history
history['requests'].append({
'timestamp': time.time(),
'endpoint': request_data['endpoint'],
'status': request_data['status']
})
history['endpoints'][request_data['endpoint']] += 1
history['user_agents'].add(request_data['user_agent'])
history['response_times'].append(request_data['response_time'])
# Clean old data
cutoff_time = time.time() - self.window_size * 2
history['requests'] = [r for r in history['requests']
if r['timestamp'] > cutoff_time]
# Extract features and predict
features = self.extract_features(history)
if features and self.is_trained:
prediction = self.model.predict([features])[0]
anomaly_score = self.model.score_samples([features])[0]
return {
'is_anomaly': prediction == -1,
'anomaly_score': anomaly_score,
'confidence': abs(anomaly_score)
}
return {'is_anomaly': False, 'anomaly_score': 0, 'confidence': 0}
Effective API rate limiting and DDoS protection require multiple layers of defense working in concert. The next chapter explores how proper input validation prevents many attacks from succeeding even if they bypass rate limiting controls.## Input Validation and SQL Injection Prevention
Input validation forms the cornerstone of API security, protecting against a wide range of attacks including SQL injection, command injection, and cross-site scripting. Every piece of data entering your API represents a potential security threat if not properly validated and sanitized. This chapter provides comprehensive guidance on implementing robust input validation strategies and preventing injection attacks that could compromise your entire system.