CVE Detection with Snyk's Intelligence
CVE Detection with Snyk's Intelligence
Snyk provides unique CVE intelligence through its vulnerability database:
#!/usr/bin/env python3
# snyk-cve-intelligence.py
import requests
import json
from typing import Dict, List, Any
import asyncio
import aiohttp
class SnykCVEIntelligence:
def __init__(self, api_token):
self.api_token = api_token
self.base_url = "https://api.snyk.io/v1"
self.headers = {
"Authorization": f"token {api_token}",
"Content-Type": "application/json"
}
async def analyze_cve(self, cve_id: str) -> Dict[str, Any]:
"""Get detailed CVE intelligence from Snyk"""
async with aiohttp.ClientSession() as session:
# Get vulnerability details
vuln_data = await self.get_vulnerability_details(session, cve_id)
# Get exploit intelligence
exploit_data = await self.get_exploit_intelligence(session, cve_id)
# Get remediation data
remediation_data = await self.get_remediation_advice(session, cve_id)
# Get real-world impact data
impact_data = await self.get_impact_analysis(session, cve_id)
return {
'cve_id': cve_id,
'vulnerability': vuln_data,
'exploit_intelligence': exploit_data,
'remediation': remediation_data,
'real_world_impact': impact_data,
'risk_score': self.calculate_snyk_risk_score(
vuln_data, exploit_data, impact_data
)
}
async def get_vulnerability_details(self, session, cve_id):
"""Fetch detailed vulnerability information"""
url = f"{self.base_url}/vulnerabilities/{cve_id}"
async with session.get(url, headers=self.headers) as response:
data = await response.json()
return {
'title': data.get('title'),
'description': data.get('description'),
'cvss_score': data.get('cvssScore'),
'cvss_vector': data.get('cvssVector'),
'cwe': data.get('cwe'),
'publication_time': data.get('publicationTime'),
'disclosure_time': data.get('disclosureTime'),
'affected_packages': self.parse_affected_packages(data)
}
async def get_exploit_intelligence(self, session, cve_id):
"""Get exploit availability and maturity information"""
url = f"{self.base_url}/vulnerabilities/{cve_id}/exploit-intelligence"
async with session.get(url, headers=self.headers) as response:
if response.status == 200:
data = await response.json()
return {
'exploit_maturity': data.get('exploitMaturity', 'No Data'),
'exploit_availability': data.get('exploitAvailability', False),
'exploit_sources': data.get('exploitSources', []),
'weaponized': data.get('weaponized', False),
'in_the_wild': data.get('inTheWild', False),
'poc_available': data.get('pocAvailable', False)
}
return {'exploit_maturity': 'No Data'}
def calculate_snyk_risk_score(self, vuln_data, exploit_data, impact_data):
"""Calculate risk score using Snyk's methodology"""
base_score = vuln_data.get('cvss_score', 0)
# Adjust based on exploit maturity
exploit_multiplier = {
'Mature': 2.0,
'Proof of Concept': 1.5,
'Functional': 1.3,
'Unproven': 1.0,
'No Data': 0.8
}.get(exploit_data.get('exploit_maturity', 'No Data'), 1.0)
# Adjust based on real-world impact
if impact_data.get('active_campaigns', 0) > 0:
exploit_multiplier *= 1.5
# Adjust based on fix availability
if not vuln_data.get('fix_available', True):
exploit_multiplier *= 1.2
risk_score = min(10, base_score * exploit_multiplier)
return {
'score': round(risk_score, 2),
'severity': self.score_to_severity(risk_score),
'factors': {
'base_cvss': base_score,
'exploit_maturity': exploit_data.get('exploit_maturity'),
'active_exploitation': impact_data.get('active_campaigns', 0) > 0,
'fix_available': vuln_data.get('fix_available', True)
}
}
def score_to_severity(self, score):
"""Convert risk score to severity level"""
if score >= 9.0:
return 'CRITICAL'
elif score >= 7.0:
return 'HIGH'
elif score >= 4.0:
return 'MEDIUM'
else:
return 'LOW'
# Integration with container scanning
async def enhanced_container_scan(image_name, snyk_token):
"""Perform enhanced container scan with CVE intelligence"""
# Run standard Snyk container test
scan_results = run_snyk_container_test(image_name)
# Initialize intelligence analyzer
intelligence = SnykCVEIntelligence(snyk_token)
# Enhance each CVE with intelligence
enhanced_results = []
for vuln in scan_results.get('vulnerabilities', []):
cve_id = vuln.get('identifiers', {}).get('CVE', [None])[0]
if cve_id:
# Get enhanced intelligence
intel_data = await intelligence.analyze_cve(cve_id)
# Merge with scan data
enhanced_vuln = {
**vuln,
'intelligence': intel_data,
'priority_score': calculate_priority_score(vuln, intel_data)
}
enhanced_results.append(enhanced_vuln)
# Sort by priority
enhanced_results.sort(key=lambda x: x['priority_score'], reverse=True)
return {
'image': image_name,
'scan_date': datetime.now().isoformat(),
'vulnerabilities': enhanced_results,
'summary': generate_intelligence_summary(enhanced_results)
}
def calculate_priority_score(vuln, intel_data):
"""Calculate priority score for remediation"""
factors = {
'cvss_score': intel_data['vulnerability'].get('cvss_score', 0) / 10,
'exploit_available': 1.0 if intel_data['exploit_intelligence'].get('exploit_availability') else 0.0,
'in_the_wild': 1.0 if intel_data['exploit_intelligence'].get('in_the_wild') else 0.0,
'fix_available': 0.0 if vuln.get('fixedIn') else 1.0,
'package_importance': get_package_importance(vuln.get('packageName'))
}
weights = {
'cvss_score': 0.2,
'exploit_available': 0.3,
'in_the_wild': 0.3,
'fix_available': 0.1,
'package_importance': 0.1
}
score = sum(factors[k] * weights[k] for k in factors)
return round(score * 100, 2)