Automated Threat Intelligence

Automated Threat Intelligence

Modern firewall monitoring incorporates threat intelligence feeds to identify known malicious actors automatically. Integrating external threat data enhances detection capabilities without requiring manual rule updates.

Implement threat intelligence consumption:

import requests
import ipaddress
from datetime import datetime, timedelta

class ThreatIntelligenceManager:
    def __init__(self):
        self.threat_feeds = {
            'emerging_threats': 'https://rules.emergingthreats.net/blockrules/compromised-ips.txt',
            'abuse_ch': 'https://feodotracker.abuse.ch/downloads/ipblocklist.txt',
            'talos': 'https://talosintelligence.com/documents/ip-blacklist'
        }
        self.threat_cache = {}
        self.last_update = {}
        
    def update_threat_data(self):
        """Update threat intelligence from configured feeds"""
        for feed_name, feed_url in self.threat_feeds.items():
            try:
                response = requests.get(feed_url, timeout=30)
                if response.status_code == 200:
                    self.process_threat_feed(feed_name, response.text)
                    self.last_update[feed_name] = datetime.now()
            except Exception as e:
                print(f"Failed to update {feed_name}: {e}")
    
    def process_threat_feed(self, feed_name, data):
        """Parse threat feed and update cache"""
        threat_ips = set()
        
        for line in data.splitlines():
            line = line.strip()
            if line and not line.startswith('#'):
                try:
                    # Validate IP address
                    ip = ipaddress.ip_address(line.split()[0])
                    threat_ips.add(str(ip))
                except ValueError:
                    continue
        
        self.threat_cache[feed_name] = threat_ips
        
    def check_ip_reputation(self, ip_address):
        """Check if IP appears in threat feeds"""
        threats = []
        
        for feed_name, threat_ips in self.threat_cache.items():
            if ip_address in threat_ips:
                threats.append({
                    'feed': feed_name,
                    'last_updated': self.last_update.get(feed_name, 'unknown')
                })
        
        return threats
    
    def enrich_firewall_logs(self, log_entry):
        """Add threat intelligence to firewall logs"""
        source_ip = log_entry.get('source_ip')
        if source_ip:
            threats = self.check_ip_reputation(source_ip)
            if threats:
                log_entry['threat_intel'] = threats
                log_entry['threat_score'] = len(threats) * 25  # Simple scoring
        
        return log_entry