Monitoring and Alerting
Monitoring and Alerting
Continuous monitoring transforms firewalls from static defenses into dynamic security tools. Effective monitoring strategies provide visibility into attack patterns, performance impacts, and potential misconfigurations.
Log Aggregation and Analysis: Centralize firewall logs for comprehensive analysis:
# Log aggregation pipeline
import json
from elasticsearch import Elasticsearch
from datetime import datetime
es = Elasticsearch(['localhost:9200'])
def process_firewall_log(log_entry):
# Parse log entry
parsed = {
'timestamp': datetime.now(),
'source_ip': log_entry.get('src'),
'destination_ip': log_entry.get('dst'),
'destination_port': log_entry.get('dpt'),
'action': log_entry.get('action'),
'rule_id': log_entry.get('rule_id')
}
# Enrich with GeoIP data
if parsed['source_ip']:
parsed['geoip'] = get_geoip_data(parsed['source_ip'])
# Index in Elasticsearch
es.index(
index=f"firewall-logs-{datetime.now().strftime('%Y.%m.%d')}",
body=parsed
)
# Check for alert conditions
if should_alert(parsed):
send_alert(parsed)
def should_alert(log_entry):
# Alert on high-frequency blocks from single IP
recent_blocks = es.count(
index="firewall-logs-*",
body={
"query": {
"bool": {
"must": [
{"term": {"source_ip": log_entry['source_ip']}},
{"term": {"action": "block"}},
{"range": {"timestamp": {"gte": "now-5m"}}}
]
}
}
}
)
return recent_blocks['count'] > 100
Key Performance Indicators: Track metrics that indicate firewall health and effectiveness:
- Connection Rate: New connections per second
- Rule Hit Rate: Frequency of each rule matching
- Processing Latency: Time added by firewall processing
- Drop Rate: Percentage of traffic blocked
- Resource Utilization: CPU, memory, and connection table usage
Automated Response: Implement automated responses to common scenarios:
# Automated threat response
class FirewallAutomation:
def __init__(self, firewall_api):
self.firewall = firewall_api
self.threat_threshold = 1000
def analyze_threats(self, time_window=300):
# Get recent blocked IPs
blocked_ips = self.firewall.get_blocked_ips(time_window)
for ip, count in blocked_ips.items():
if count > self.threat_threshold:
self.escalate_block(ip)
def escalate_block(self, ip):
# Add to long-term blocklist
self.firewall.add_to_blocklist(ip, duration=86400)
# Update threat intelligence
self.update_threat_intel(ip)
# Notify security team
self.send_notification(f"IP {ip} added to 24-hour block")