Deep Dive into CVE Detection Mechanisms

Deep Dive into CVE Detection Mechanisms

Modern CVE detection tools employ sophisticated techniques to identify vulnerabilities across different package ecosystems:

#!/usr/bin/env python3
# cve-detection-analyzer.py

import json
import requests
from datetime import datetime, timedelta
import hashlib

class CVEDetectionAnalyzer:
    def __init__(self):
        self.nvd_api = "https://services.nvd.nist.gov/rest/json/cves/2.0"
        self.vulnerability_sources = {
            'nvd': self.check_nvd,
            'redhat': self.check_redhat_security,
            'debian': self.check_debian_security,
            'alpine': self.check_alpine_security,
            'nodejs': self.check_npm_audit,
            'python': self.check_python_safety,
            'ruby': self.check_ruby_advisory
        }
        
    def analyze_package(self, package_name, version, ecosystem):
        """Comprehensive CVE analysis for a package"""
        results = {
            'package': package_name,
            'version': version,
            'ecosystem': ecosystem,
            'scan_date': datetime.now().isoformat(),
            'vulnerabilities': [],
            'risk_score': 0
        }
        
        # Check multiple sources
        for source, checker in self.vulnerability_sources.items():
            if self.is_relevant_source(ecosystem, source):
                vulns = checker(package_name, version)
                results['vulnerabilities'].extend(vulns)
        
        # Calculate aggregate risk score
        results['risk_score'] = self.calculate_risk_score(results['vulnerabilities'])
        
        # Add exploitation context
        results['exploitation_context'] = self.get_exploitation_context(results['vulnerabilities'])
        
        return results
    
    def check_nvd(self, package_name, version):
        """Query NVD database for CVEs"""
        vulnerabilities = []
        
        params = {
            'keywordSearch': package_name,
            'resultsPerPage': 100
        }
        
        try:
            response = requests.get(self.nvd_api, params=params)
            data = response.json()
            
            for cve_item in data.get('vulnerabilities', []):
                cve = cve_item['cve']
                
                # Check if version is affected
                if self.is_version_affected(version, cve):
                    vulnerabilities.append({
                        'cve_id': cve['id'],
                        'description': cve['descriptions'][0]['value'],
                        'cvss_v3': self.extract_cvss_v3(cve),
                        'published_date': cve['published'],
                        'references': self.extract_references(cve),
                        'cpe_matches': self.extract_cpe_matches(cve)
                    })
                    
        except Exception as e:
            print(f"Error querying NVD: {e}")
            
        return vulnerabilities
    
    def is_version_affected(self, version, cve_data):
        """Determine if a specific version is affected by CVE"""
        configurations = cve_data.get('configurations', [])
        
        for config in configurations:
            for node in config.get('nodes', []):
                for cpe_match in node.get('cpeMatch', []):
                    if cpe_match.get('vulnerable', False):
                        # Version range checking
                        version_start = cpe_match.get('versionStartIncluding')
                        version_end = cpe_match.get('versionEndExcluding')
                        
                        if self.version_in_range(version, version_start, version_end):
                            return True
        
        return False
    
    def calculate_risk_score(self, vulnerabilities):
        """Calculate aggregate risk score based on multiple factors"""
        if not vulnerabilities:
            return 0
            
        factors = {
            'cvss_score': 0,
            'exploit_availability': 0,
            'patch_availability': 0,
            'age_factor': 0
        }
        
        for vuln in vulnerabilities:
            # CVSS score factor
            cvss = vuln.get('cvss_v3', {}).get('base_score', 0)
            factors['cvss_score'] = max(factors['cvss_score'], cvss)
            
            # Exploit availability factor
            if vuln.get('exploit_available', False):
                factors['exploit_availability'] = 10
                
            # Patch availability factor
            if vuln.get('patch_available', False):
                factors['patch_availability'] += 2
            else:
                factors['patch_availability'] += 5
                
            # Age factor (older = higher risk)
            published_date = datetime.fromisoformat(vuln['published_date'])
            age_days = (datetime.now() - published_date).days
            if age_days > 365:
                factors['age_factor'] = max(factors['age_factor'], 8)
            elif age_days > 90:
                factors['age_factor'] = max(factors['age_factor'], 5)
                
        # Weighted calculation
        risk_score = (
            factors['cvss_score'] * 0.4 +
            factors['exploit_availability'] * 0.3 +
            factors['patch_availability'] * 0.2 +
            factors['age_factor'] * 0.1
        )
        
        return round(risk_score, 2)