Automated Threat Intelligence
Automated Threat Intelligence
Modern firewall monitoring incorporates threat intelligence feeds to identify known malicious actors automatically. Integrating external threat data enhances detection capabilities without requiring manual rule updates.
Implement threat intelligence consumption:
import requests
import ipaddress
from datetime import datetime, timedelta
class ThreatIntelligenceManager:
def __init__(self):
self.threat_feeds = {
'emerging_threats': 'https://rules.emergingthreats.net/blockrules/compromised-ips.txt',
'abuse_ch': 'https://feodotracker.abuse.ch/downloads/ipblocklist.txt',
'talos': 'https://talosintelligence.com/documents/ip-blacklist'
}
self.threat_cache = {}
self.last_update = {}
def update_threat_data(self):
"""Update threat intelligence from configured feeds"""
for feed_name, feed_url in self.threat_feeds.items():
try:
response = requests.get(feed_url, timeout=30)
if response.status_code == 200:
self.process_threat_feed(feed_name, response.text)
self.last_update[feed_name] = datetime.now()
except Exception as e:
print(f"Failed to update {feed_name}: {e}")
def process_threat_feed(self, feed_name, data):
"""Parse threat feed and update cache"""
threat_ips = set()
for line in data.splitlines():
line = line.strip()
if line and not line.startswith('#'):
try:
# Validate IP address
ip = ipaddress.ip_address(line.split()[0])
threat_ips.add(str(ip))
except ValueError:
continue
self.threat_cache[feed_name] = threat_ips
def check_ip_reputation(self, ip_address):
"""Check if IP appears in threat feeds"""
threats = []
for feed_name, threat_ips in self.threat_cache.items():
if ip_address in threat_ips:
threats.append({
'feed': feed_name,
'last_updated': self.last_update.get(feed_name, 'unknown')
})
return threats
def enrich_firewall_logs(self, log_entry):
"""Add threat intelligence to firewall logs"""
source_ip = log_entry.get('source_ip')
if source_ip:
threats = self.check_ip_reputation(source_ip)
if threats:
log_entry['threat_intel'] = threats
log_entry['threat_score'] = len(threats) * 25 # Simple scoring
return log_entry