Advanced Detection and Response
Advanced Detection and Response
Implement sophisticated detection mechanisms that identify attack patterns beyond simple rate limiting. Machine learning and behavioral analysis can detect subtle attack variations.
Create an intelligent SSH attack detector:
#!/usr/bin/env python3
# ssh-attack-detector.py
# Advanced SSH attack detection with pattern analysis
import re
import time
import collections
from datetime import datetime, timedelta
import geoip2.database
import numpy as np
from sklearn.ensemble import IsolationForest
class SSHAttackDetector:
def __init__(self):
self.failed_attempts = collections.defaultdict(list)
self.user_patterns = collections.defaultdict(set)
self.geo_reader = geoip2.database.Reader('/usr/share/GeoIP/GeoLite2-Country.mmdb')
self.model = IsolationForest(contamination=0.1)
self.attack_scores = {}
def analyze_log_entry(self, line):
# Parse log entry
failed_match = re.search(r'Failed password for (\S+) from (\S+)', line)
invalid_match = re.search(r'Invalid user (\S+) from (\S+)', line)
if failed_match:
user, ip = failed_match.groups()
self.process_failed_attempt(user, ip, 'failed_password')
elif invalid_match:
user, ip = invalid_match.groups()
self.process_failed_attempt(user, ip, 'invalid_user')
def process_failed_attempt(self, user, ip, attempt_type):
timestamp = datetime.now()
# Record attempt
self.failed_attempts[ip].append({
'timestamp': timestamp,
'user': user,
'type': attempt_type
})
# Update user patterns
self.user_patterns[ip].add(user)
# Calculate attack score
score = self.calculate_attack_score(ip)
self.attack_scores[ip] = score
# Take action if threshold exceeded
if score > 100:
self.block_attacker(ip, score)
def calculate_attack_score(self, ip):
score = 0
attempts = self.failed_attempts[ip]
# Recent attempts (last 5 minutes)
recent = [a for a in attempts
if a['timestamp'] > datetime.now() - timedelta(minutes=5)]
# Scoring factors
score += len(recent) * 10 # Recent attempt count
score += len(self.user_patterns[ip]) * 5 # Username diversity
# Check for common attack patterns
usernames = [a['user'] for a in recent]
if any(u in ['root', 'admin', 'test', 'guest'] for u in usernames):
score += 20
# Geographic risk scoring
try:
country = self.geo_reader.country(ip).country.iso_code
high_risk_countries = ['CN', 'RU', 'KP', 'IR']
if country in high_risk_countries:
score += 30
except:
score += 10 # Unknown location
# Timing analysis
if len(recent) > 1:
intervals = []
for i in range(1, len(recent)):
interval = (recent[i]['timestamp'] - recent[i-1]['timestamp']).total_seconds()
intervals.append(interval)
# Automated attacks have consistent timing
if np.std(intervals) < 1.0: # Low variance suggests automation
score += 25
return score
def block_attacker(self, ip, score):
print(f"Blocking {ip} with score {score}")
# Dynamic ban duration based on score
if score > 200:
ban_duration = 86400 # 24 hours
elif score > 150:
ban_duration = 3600 # 1 hour
else:
ban_duration = 600 # 10 minutes
# Execute blocking command
cmd = f"fail2ban-client set sshd-aggressive banip {ip}"
os.system(cmd)
# Log the action
with open('/var/log/ssh-attack-detector.log', 'a') as f:
f.write(f"{datetime.now()} - Blocked {ip} for {ban_duration}s (score: {score})\n")