Real-time Security Monitoring

Real-time Security Monitoring

Real-time monitoring enables immediate detection and response to security incidents. Implementing effective real-time monitoring requires careful tuning to balance detection sensitivity with false positive rates. Automated alerting ensures critical events receive immediate attention while avoiding alert fatigue.

Windows real-time monitoring with PowerShell:

# Real-time security event monitor
Register-WmiEvent -Query "SELECT * FROM __InstanceCreationEvent WITHIN 1 WHERE TargetInstance ISA 'Win32_NTLogEvent' AND TargetInstance.Logfile = 'Security' AND (TargetInstance.EventCode = 4625 OR TargetInstance.EventCode = 4720 OR TargetInstance.EventCode = 4732)" -Action {
    $Event = $EventArgs.NewEvent.TargetInstance
    
    switch ($Event.EventCode) {
        4625 {
            $Message = "Failed logon attempt detected from $($Event.ComputerName)"
            Send-MailMessage -To "[email protected]" -Subject "Security Alert: Failed Logon" -Body $Message
        }
        4720 {
            $Message = "New user account created on $($Event.ComputerName)"
            Send-MailMessage -To "[email protected]" -Subject "Security Alert: Account Created" -Body $Message
        }
        4732 {
            $Message = "User added to administrators group on $($Event.ComputerName)"
            Send-MailMessage -To "[email protected]" -Subject "CRITICAL: Admin Group Change" -Body $Message -Priority High
        }
    }
}

Linux real-time monitoring with auditd and custom scripts:

#!/bin/bash
# Real-time audit log monitor

# Monitor specific audit keys in real-time
tail -f /var/log/audit/audit.log | while read line; do
    # Check for privileged command execution
    if echo "$line" | grep -q "key=\"privileged\""; then
        USER=$(echo "$line" | grep -oP 'uid=\K[0-9]+')
        COMMAND=$(echo "$line" | grep -oP 'exe="\K[^"]+')
        echo "ALERT: Privileged command executed by UID $USER: $COMMAND" | mail -s "Security Alert" [email protected]
    fi
    
    # Check for file integrity violations
    if echo "$line" | grep -q "key=\"identity\""; then
        FILE=$(echo "$line" | grep -oP 'name="\K[^"]+')
        echo "CRITICAL: System file modified: $FILE" | mail -s "File Integrity Alert" [email protected]
    fi
    
    # Check for failed authentication
    if echo "$line" | grep -q "type=USER_AUTH.*res=failed"; then
        USER=$(echo "$line" | grep -oP 'acct="\K[^"]+')
        echo "WARNING: Failed authentication for user: $USER" | logger -t security_monitor
    fi
done

Implement correlation rules for complex attack detection:

#!/usr/bin/env python3
# Security event correlation engine

import time
import re
from collections import defaultdict
from datetime import datetime, timedelta

class SecurityCorrelator:
    def __init__(self):
        self.failed_logins = defaultdict(list)
        self.privilege_escalations = defaultdict(list)
        self.file_access = defaultdict(list)
        
    def process_event(self, event):
        # Brute force detection
        if event['type'] == 'failed_login':
            self.failed_logins[event['source_ip']].append(event['timestamp'])
            
            # Check for brute force pattern
            recent_failures = [t for t in self.failed_logins[event['source_ip']] 
                             if t > datetime.now() - timedelta(minutes=10)]
            
            if len(recent_failures) > 10:
                self.alert(f"Brute force attack from {event['source_ip']}: {len(recent_failures)} failures in 10 minutes")
        
        # Lateral movement detection
        elif event['type'] == 'successful_login':
            if event['source_ip'] in self.failed_logins:
                if len(self.failed_logins[event['source_ip']]) > 5:
                    self.alert(f"Successful login after multiple failures from {event['source_ip']} - possible breach")
        
        # Privilege escalation detection
        elif event['type'] == 'privilege_use':
            self.privilege_escalations[event['user']].append(event['timestamp'])
            
            # Check for unusual privilege usage pattern
            recent_privs = [t for t in self.privilege_escalations[event['user']] 
                          if t > datetime.now() - timedelta(hours=1)]
            
            if len(recent_privs) > 5:
                self.alert(f"Unusual privilege usage by {event['user']}: {len(recent_privs)} in 1 hour")
    
    def alert(self, message):
        print(f"[SECURITY ALERT] {datetime.now()}: {message}")
        # Send to SIEM/alerting system