Prioritizing Vulnerabilities for Remediation

Prioritizing Vulnerabilities for Remediation

Not all vulnerabilities require immediate attention. Effective remediation strategies begin with intelligent prioritization:

#!/usr/bin/env python3
# vulnerability-prioritization.py

import json
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import requests

class VulnerabilityPrioritizer:
    def __init__(self, scan_results: Dict):
        self.scan_results = scan_results
        self.epss_data = self.load_epss_scores()
        self.asset_criticality = self.load_asset_criticality()
        
    def load_epss_scores(self) -> Dict:
        """Load Exploit Prediction Scoring System data"""
        # In production, this would fetch from FIRST.org EPSS API
        return {
            "CVE-2023-12345": 0.97,  # High likelihood of exploitation
            "CVE-2023-23456": 0.12,  # Low likelihood
            # ... more CVE scores
        }
        
    def load_asset_criticality(self) -> Dict:
        """Load asset criticality ratings"""
        return {
            "payment-service": 10,  # Critical
            "blog-app": 3,         # Low importance
            "api-gateway": 9,      # High importance
            # ... more ratings
        }
    
    def calculate_risk_score(self, vulnerability: Dict, image_context: Dict) -> float:
        """Calculate contextualized risk score for a vulnerability"""
        
        # Base CVSS score (0-10)
        cvss_score = vulnerability.get('cvss_score', 0)
        
        # EPSS score (0-1, probability of exploitation)
        epss_score = self.epss_data.get(vulnerability['cve_id'], 0.5)
        
        # Asset criticality (0-10)
        asset_criticality = self.asset_criticality.get(
            image_context['service_name'], 5
        )
        
        # Exposure factor (0-1)
        exposure_factor = self.calculate_exposure_factor(image_context)
        
        # Fix availability factor (0-1)
        fix_factor = 0.2 if vulnerability.get('fixed_version') else 0.8
        
        # Age factor (older = higher priority)
        age_days = self.calculate_vulnerability_age(vulnerability)
        age_factor = min(1.0, age_days / 365)
        
        # Calculate weighted risk score
        risk_score = (
            (cvss_score * 0.25) +
            (epss_score * 10 * 0.25) +
            (asset_criticality * 0.20) +
            (exposure_factor * 10 * 0.15) +
            (fix_factor * 10 * 0.10) +
            (age_factor * 10 * 0.05)
        )
        
        return round(risk_score, 2)
    
    def calculate_exposure_factor(self, image_context: Dict) -> float:
        """Calculate exposure based on deployment context"""
        factors = {
            'internet_facing': 1.0,
            'internal_only': 0.5,
            'development': 0.2,
            'air_gapped': 0.1
        }
        
        return factors.get(image_context.get('exposure_level', 'internal_only'), 0.5)
    
    def prioritize_vulnerabilities(self) -> List[Dict]:
        """Generate prioritized list of vulnerabilities for remediation"""
        prioritized_vulns = []
        
        for image_result in self.scan_results:
            image_context = {
                'service_name': self.extract_service_name(image_result['image']),
                'exposure_level': self.determine_exposure_level(image_result),
                'namespace': image_result.get('namespace', 'default')
            }
            
            for vuln in image_result.get('vulnerabilities', []):
                vuln_priority = {
                    'vulnerability': vuln,
                    'image': image_result['image'],
                    'risk_score': self.calculate_risk_score(vuln, image_context),
                    'remediation_effort': self.estimate_remediation_effort(vuln, image_result),
                    'business_context': image_context
                }
                
                prioritized_vulns.append(vuln_priority)
        
        # Sort by risk score (descending) and effort (ascending)
        prioritized_vulns.sort(
            key=lambda x: (x['risk_score'], -x['remediation_effort']),
            reverse=True
        )
        
        return prioritized_vulns
    
    def estimate_remediation_effort(self, vulnerability: Dict, image_result: Dict) -> int:
        """Estimate effort required for remediation (1-10 scale)"""
        effort = 1
        
        # Base image update required
        if vulnerability.get('layer_id') == image_result.get('base_layer_id'):
            effort += 3
        
        # Breaking changes likely
        if self.is_major_version_change(vulnerability):
            effort += 3
        
        # Requires code changes
        if vulnerability.get('requires_code_change', False):
            effort += 2
        
        # Multiple dependencies affected
        if len(vulnerability.get('affected_dependencies', [])) > 1:
            effort += 1
        
        return min(10, effort)
    
    def generate_remediation_plan(self) -> Dict:
        """Generate actionable remediation plan"""
        prioritized = self.prioritize_vulnerabilities()
        
        plan = {
            'immediate_actions': [],   # Risk score > 8
            'short_term': [],         # Risk score 5-8
            'medium_term': [],        # Risk score 3-5
            'long_term': [],          # Risk score < 3
            'accept_risk': []         # Cannot be fixed
        }
        
        for vuln in prioritized:
            if not vuln['vulnerability'].get('fixed_version'):
                plan['accept_risk'].append(self.create_risk_acceptance(vuln))
            elif vuln['risk_score'] > 8:
                plan['immediate_actions'].append(self.create_remediation_task(vuln))
            elif vuln['risk_score'] > 5:
                plan['short_term'].append(self.create_remediation_task(vuln))
            elif vuln['risk_score'] > 3:
                plan['medium_term'].append(self.create_remediation_task(vuln))
            else:
                plan['long_term'].append(self.create_remediation_task(vuln))
        
        return plan
    
    def create_remediation_task(self, vuln_priority: Dict) -> Dict:
        """Create detailed remediation task"""
        vuln = vuln_priority['vulnerability']
        
        return {
            'task_id': f"REM-{vuln['cve_id']}-{hash(vuln_priority['image'])}",
            'title': f"Fix {vuln['cve_id']} in {vuln_priority['image']}",
            'risk_score': vuln_priority['risk_score'],
            'effort_estimate': vuln_priority['remediation_effort'],
            'steps': self.generate_remediation_steps(vuln, vuln_priority['image']),
            'validation': self.generate_validation_steps(vuln),
            'rollback_plan': self.generate_rollback_plan(vuln_priority['image'])
        }