Implementing Real-Time Monitoring

Implementing Real-Time Monitoring

Real-time monitoring enables immediate response to security incidents and performance issues. Modern monitoring systems process firewall logs as they're generated, applying detection rules and generating alerts for security teams.

Stream processing architectures handle high-volume firewall logs efficiently. Tools like Apache Kafka, AWS Kinesis, or Azure Event Hubs ingest log streams from multiple firewalls, providing reliable delivery to processing systems. These platforms handle spikes in log volume during attacks without losing data:

# Kafka consumer for real-time firewall log processing
from kafka import KafkaConsumer
import json
from datetime import datetime

class FirewallLogProcessor:
    def __init__(self, kafka_servers, topic):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=kafka_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            group_id='firewall-monitor'
        )
        self.alert_thresholds = {
            'connection_rate': 1000,  # connections per minute
            'block_rate': 100,       # blocks per minute
            'single_ip_threshold': 50  # connections from single IP
        }
        
    def process_logs(self):
        ip_counter = {}
        window_start = datetime.now()
        
        for message in self.consumer:
            log_entry = message.value
            
            # Track connection attempts per IP
            source_ip = log_entry.get('source_ip')
            if source_ip:
                ip_counter[source_ip] = ip_counter.get(source_ip, 0) + 1
                
                # Check for threshold violations
                if ip_counter[source_ip] > self.alert_thresholds['single_ip_threshold']:
                    self.generate_alert('high_connection_rate', source_ip, ip_counter[source_ip])
            
            # Process other detection logic
            self.detect_patterns(log_entry)
            
            # Reset counters every minute
            if (datetime.now() - window_start).seconds >= 60:
                ip_counter.clear()
                window_start = datetime.now()
    
    def detect_patterns(self, log_entry):
        # Detect potential SQL injection attempts
        if log_entry.get('rule_name') == 'block-sql-injection':
            self.analyze_sql_injection_attempt(log_entry)
        
        # Detect port scanning
        if self.is_port_scan(log_entry):
            self.generate_alert('port_scan', log_entry['source_ip'])

Real-time dashboards provide visual representation of firewall activity. Tools like Grafana, Kibana, or custom solutions display metrics that help identify anomalies quickly:

// Grafana dashboard query for firewall metrics
const firewallDashboard = {
  panels: [
    {
      title: "Connections per Second",
      targets: [{
        query: "rate(firewall_connections_total[1m])"
      }]
    },
    {
      title: "Top Blocked IPs",
      targets: [{
        query: "topk(10, sum by (source_ip) (firewall_blocks_total))"
      }]
    },
    {
      title: "Geographic Distribution",
      targets: [{
        query: "sum by (country) (firewall_connections_total)"
      }]
    },
    {
      title: "Rule Hit Rate",
      targets: [{
        query: "rate(firewall_rule_matches_total[5m])"
      }]
    }
  ]
};