Deep Dive into CVE Detection Mechanisms
Deep Dive into CVE Detection Mechanisms
Modern CVE detection tools employ sophisticated techniques to identify vulnerabilities across different package ecosystems:
#!/usr/bin/env python3
# cve-detection-analyzer.py
import json
import requests
from datetime import datetime, timedelta
import hashlib
class CVEDetectionAnalyzer:
def __init__(self):
self.nvd_api = "https://services.nvd.nist.gov/rest/json/cves/2.0"
self.vulnerability_sources = {
'nvd': self.check_nvd,
'redhat': self.check_redhat_security,
'debian': self.check_debian_security,
'alpine': self.check_alpine_security,
'nodejs': self.check_npm_audit,
'python': self.check_python_safety,
'ruby': self.check_ruby_advisory
}
def analyze_package(self, package_name, version, ecosystem):
"""Comprehensive CVE analysis for a package"""
results = {
'package': package_name,
'version': version,
'ecosystem': ecosystem,
'scan_date': datetime.now().isoformat(),
'vulnerabilities': [],
'risk_score': 0
}
# Check multiple sources
for source, checker in self.vulnerability_sources.items():
if self.is_relevant_source(ecosystem, source):
vulns = checker(package_name, version)
results['vulnerabilities'].extend(vulns)
# Calculate aggregate risk score
results['risk_score'] = self.calculate_risk_score(results['vulnerabilities'])
# Add exploitation context
results['exploitation_context'] = self.get_exploitation_context(results['vulnerabilities'])
return results
def check_nvd(self, package_name, version):
"""Query NVD database for CVEs"""
vulnerabilities = []
params = {
'keywordSearch': package_name,
'resultsPerPage': 100
}
try:
response = requests.get(self.nvd_api, params=params)
data = response.json()
for cve_item in data.get('vulnerabilities', []):
cve = cve_item['cve']
# Check if version is affected
if self.is_version_affected(version, cve):
vulnerabilities.append({
'cve_id': cve['id'],
'description': cve['descriptions'][0]['value'],
'cvss_v3': self.extract_cvss_v3(cve),
'published_date': cve['published'],
'references': self.extract_references(cve),
'cpe_matches': self.extract_cpe_matches(cve)
})
except Exception as e:
print(f"Error querying NVD: {e}")
return vulnerabilities
def is_version_affected(self, version, cve_data):
"""Determine if a specific version is affected by CVE"""
configurations = cve_data.get('configurations', [])
for config in configurations:
for node in config.get('nodes', []):
for cpe_match in node.get('cpeMatch', []):
if cpe_match.get('vulnerable', False):
# Version range checking
version_start = cpe_match.get('versionStartIncluding')
version_end = cpe_match.get('versionEndExcluding')
if self.version_in_range(version, version_start, version_end):
return True
return False
def calculate_risk_score(self, vulnerabilities):
"""Calculate aggregate risk score based on multiple factors"""
if not vulnerabilities:
return 0
factors = {
'cvss_score': 0,
'exploit_availability': 0,
'patch_availability': 0,
'age_factor': 0
}
for vuln in vulnerabilities:
# CVSS score factor
cvss = vuln.get('cvss_v3', {}).get('base_score', 0)
factors['cvss_score'] = max(factors['cvss_score'], cvss)
# Exploit availability factor
if vuln.get('exploit_available', False):
factors['exploit_availability'] = 10
# Patch availability factor
if vuln.get('patch_available', False):
factors['patch_availability'] += 2
else:
factors['patch_availability'] += 5
# Age factor (older = higher risk)
published_date = datetime.fromisoformat(vuln['published_date'])
age_days = (datetime.now() - published_date).days
if age_days > 365:
factors['age_factor'] = max(factors['age_factor'], 8)
elif age_days > 90:
factors['age_factor'] = max(factors['age_factor'], 5)
# Weighted calculation
risk_score = (
factors['cvss_score'] * 0.4 +
factors['exploit_availability'] * 0.3 +
factors['patch_availability'] * 0.2 +
factors['age_factor'] * 0.1
)
return round(risk_score, 2)