CVE Detection with Snyk's Intelligence

CVE Detection with Snyk's Intelligence

Snyk provides unique CVE intelligence through its vulnerability database:

#!/usr/bin/env python3
# snyk-cve-intelligence.py

import requests
import json
from typing import Dict, List, Any
import asyncio
import aiohttp

class SnykCVEIntelligence:
    def __init__(self, api_token):
        self.api_token = api_token
        self.base_url = "https://api.snyk.io/v1"
        self.headers = {
            "Authorization": f"token {api_token}",
            "Content-Type": "application/json"
        }
        
    async def analyze_cve(self, cve_id: str) -> Dict[str, Any]:
        """Get detailed CVE intelligence from Snyk"""
        async with aiohttp.ClientSession() as session:
            # Get vulnerability details
            vuln_data = await self.get_vulnerability_details(session, cve_id)
            
            # Get exploit intelligence
            exploit_data = await self.get_exploit_intelligence(session, cve_id)
            
            # Get remediation data
            remediation_data = await self.get_remediation_advice(session, cve_id)
            
            # Get real-world impact data
            impact_data = await self.get_impact_analysis(session, cve_id)
            
            return {
                'cve_id': cve_id,
                'vulnerability': vuln_data,
                'exploit_intelligence': exploit_data,
                'remediation': remediation_data,
                'real_world_impact': impact_data,
                'risk_score': self.calculate_snyk_risk_score(
                    vuln_data, exploit_data, impact_data
                )
            }
    
    async def get_vulnerability_details(self, session, cve_id):
        """Fetch detailed vulnerability information"""
        url = f"{self.base_url}/vulnerabilities/{cve_id}"
        
        async with session.get(url, headers=self.headers) as response:
            data = await response.json()
            
            return {
                'title': data.get('title'),
                'description': data.get('description'),
                'cvss_score': data.get('cvssScore'),
                'cvss_vector': data.get('cvssVector'),
                'cwe': data.get('cwe'),
                'publication_time': data.get('publicationTime'),
                'disclosure_time': data.get('disclosureTime'),
                'affected_packages': self.parse_affected_packages(data)
            }
    
    async def get_exploit_intelligence(self, session, cve_id):
        """Get exploit availability and maturity information"""
        url = f"{self.base_url}/vulnerabilities/{cve_id}/exploit-intelligence"
        
        async with session.get(url, headers=self.headers) as response:
            if response.status == 200:
                data = await response.json()
                
                return {
                    'exploit_maturity': data.get('exploitMaturity', 'No Data'),
                    'exploit_availability': data.get('exploitAvailability', False),
                    'exploit_sources': data.get('exploitSources', []),
                    'weaponized': data.get('weaponized', False),
                    'in_the_wild': data.get('inTheWild', False),
                    'poc_available': data.get('pocAvailable', False)
                }
            
            return {'exploit_maturity': 'No Data'}
    
    def calculate_snyk_risk_score(self, vuln_data, exploit_data, impact_data):
        """Calculate risk score using Snyk's methodology"""
        base_score = vuln_data.get('cvss_score', 0)
        
        # Adjust based on exploit maturity
        exploit_multiplier = {
            'Mature': 2.0,
            'Proof of Concept': 1.5,
            'Functional': 1.3,
            'Unproven': 1.0,
            'No Data': 0.8
        }.get(exploit_data.get('exploit_maturity', 'No Data'), 1.0)
        
        # Adjust based on real-world impact
        if impact_data.get('active_campaigns', 0) > 0:
            exploit_multiplier *= 1.5
            
        # Adjust based on fix availability
        if not vuln_data.get('fix_available', True):
            exploit_multiplier *= 1.2
            
        risk_score = min(10, base_score * exploit_multiplier)
        
        return {
            'score': round(risk_score, 2),
            'severity': self.score_to_severity(risk_score),
            'factors': {
                'base_cvss': base_score,
                'exploit_maturity': exploit_data.get('exploit_maturity'),
                'active_exploitation': impact_data.get('active_campaigns', 0) > 0,
                'fix_available': vuln_data.get('fix_available', True)
            }
        }
    
    def score_to_severity(self, score):
        """Convert risk score to severity level"""
        if score >= 9.0:
            return 'CRITICAL'
        elif score >= 7.0:
            return 'HIGH'
        elif score >= 4.0:
            return 'MEDIUM'
        else:
            return 'LOW'

# Integration with container scanning
async def enhanced_container_scan(image_name, snyk_token):
    """Perform enhanced container scan with CVE intelligence"""
    
    # Run standard Snyk container test
    scan_results = run_snyk_container_test(image_name)
    
    # Initialize intelligence analyzer
    intelligence = SnykCVEIntelligence(snyk_token)
    
    # Enhance each CVE with intelligence
    enhanced_results = []
    
    for vuln in scan_results.get('vulnerabilities', []):
        cve_id = vuln.get('identifiers', {}).get('CVE', [None])[0]
        
        if cve_id:
            # Get enhanced intelligence
            intel_data = await intelligence.analyze_cve(cve_id)
            
            # Merge with scan data
            enhanced_vuln = {
                **vuln,
                'intelligence': intel_data,
                'priority_score': calculate_priority_score(vuln, intel_data)
            }
            
            enhanced_results.append(enhanced_vuln)
    
    # Sort by priority
    enhanced_results.sort(key=lambda x: x['priority_score'], reverse=True)
    
    return {
        'image': image_name,
        'scan_date': datetime.now().isoformat(),
        'vulnerabilities': enhanced_results,
        'summary': generate_intelligence_summary(enhanced_results)
    }

def calculate_priority_score(vuln, intel_data):
    """Calculate priority score for remediation"""
    factors = {
        'cvss_score': intel_data['vulnerability'].get('cvss_score', 0) / 10,
        'exploit_available': 1.0 if intel_data['exploit_intelligence'].get('exploit_availability') else 0.0,
        'in_the_wild': 1.0 if intel_data['exploit_intelligence'].get('in_the_wild') else 0.0,
        'fix_available': 0.0 if vuln.get('fixedIn') else 1.0,
        'package_importance': get_package_importance(vuln.get('packageName'))
    }
    
    weights = {
        'cvss_score': 0.2,
        'exploit_available': 0.3,
        'in_the_wild': 0.3,
        'fix_available': 0.1,
        'package_importance': 0.1
    }
    
    score = sum(factors[k] * weights[k] for k in factors)
    return round(score * 100, 2)