Advanced Detection and Response

Advanced Detection and Response

Implement sophisticated detection mechanisms that identify attack patterns beyond simple rate limiting. Machine learning and behavioral analysis can detect subtle attack variations.

Create an intelligent SSH attack detector:

#!/usr/bin/env python3
# ssh-attack-detector.py
# Advanced SSH attack detection with pattern analysis

import re
import time
import collections
from datetime import datetime, timedelta
import geoip2.database
import numpy as np
from sklearn.ensemble import IsolationForest

class SSHAttackDetector:
    def __init__(self):
        self.failed_attempts = collections.defaultdict(list)
        self.user_patterns = collections.defaultdict(set)
        self.geo_reader = geoip2.database.Reader('/usr/share/GeoIP/GeoLite2-Country.mmdb')
        self.model = IsolationForest(contamination=0.1)
        self.attack_scores = {}
        
    def analyze_log_entry(self, line):
        # Parse log entry
        failed_match = re.search(r'Failed password for (\S+) from (\S+)', line)
        invalid_match = re.search(r'Invalid user (\S+) from (\S+)', line)
        
        if failed_match:
            user, ip = failed_match.groups()
            self.process_failed_attempt(user, ip, 'failed_password')
        elif invalid_match:
            user, ip = invalid_match.groups()
            self.process_failed_attempt(user, ip, 'invalid_user')
            
    def process_failed_attempt(self, user, ip, attempt_type):
        timestamp = datetime.now()
        
        # Record attempt
        self.failed_attempts[ip].append({
            'timestamp': timestamp,
            'user': user,
            'type': attempt_type
        })
        
        # Update user patterns
        self.user_patterns[ip].add(user)
        
        # Calculate attack score
        score = self.calculate_attack_score(ip)
        self.attack_scores[ip] = score
        
        # Take action if threshold exceeded
        if score > 100:
            self.block_attacker(ip, score)
            
    def calculate_attack_score(self, ip):
        score = 0
        attempts = self.failed_attempts[ip]
        
        # Recent attempts (last 5 minutes)
        recent = [a for a in attempts 
                 if a['timestamp'] > datetime.now() - timedelta(minutes=5)]
        
        # Scoring factors
        score += len(recent) * 10  # Recent attempt count
        score += len(self.user_patterns[ip]) * 5  # Username diversity
        
        # Check for common attack patterns
        usernames = [a['user'] for a in recent]
        if any(u in ['root', 'admin', 'test', 'guest'] for u in usernames):
            score += 20
            
        # Geographic risk scoring
        try:
            country = self.geo_reader.country(ip).country.iso_code
            high_risk_countries = ['CN', 'RU', 'KP', 'IR']
            if country in high_risk_countries:
                score += 30
        except:
            score += 10  # Unknown location
            
        # Timing analysis
        if len(recent) > 1:
            intervals = []
            for i in range(1, len(recent)):
                interval = (recent[i]['timestamp'] - recent[i-1]['timestamp']).total_seconds()
                intervals.append(interval)
            
            # Automated attacks have consistent timing
            if np.std(intervals) < 1.0:  # Low variance suggests automation
                score += 25
                
        return score
        
    def block_attacker(self, ip, score):
        print(f"Blocking {ip} with score {score}")
        
        # Dynamic ban duration based on score
        if score > 200:
            ban_duration = 86400  # 24 hours
        elif score > 150:
            ban_duration = 3600   # 1 hour
        else:
            ban_duration = 600    # 10 minutes
            
        # Execute blocking command
        cmd = f"fail2ban-client set sshd-aggressive banip {ip}"
        os.system(cmd)
        
        # Log the action
        with open('/var/log/ssh-attack-detector.log', 'a') as f:
            f.write(f"{datetime.now()} - Blocked {ip} for {ban_duration}s (score: {score})\n")