Prioritizing Vulnerabilities for Remediation
Prioritizing Vulnerabilities for Remediation
Not all vulnerabilities require immediate attention. Effective remediation strategies begin with intelligent prioritization:
#!/usr/bin/env python3
# vulnerability-prioritization.py
import json
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import requests
class VulnerabilityPrioritizer:
def __init__(self, scan_results: Dict):
self.scan_results = scan_results
self.epss_data = self.load_epss_scores()
self.asset_criticality = self.load_asset_criticality()
def load_epss_scores(self) -> Dict:
"""Load Exploit Prediction Scoring System data"""
# In production, this would fetch from FIRST.org EPSS API
return {
"CVE-2023-12345": 0.97, # High likelihood of exploitation
"CVE-2023-23456": 0.12, # Low likelihood
# ... more CVE scores
}
def load_asset_criticality(self) -> Dict:
"""Load asset criticality ratings"""
return {
"payment-service": 10, # Critical
"blog-app": 3, # Low importance
"api-gateway": 9, # High importance
# ... more ratings
}
def calculate_risk_score(self, vulnerability: Dict, image_context: Dict) -> float:
"""Calculate contextualized risk score for a vulnerability"""
# Base CVSS score (0-10)
cvss_score = vulnerability.get('cvss_score', 0)
# EPSS score (0-1, probability of exploitation)
epss_score = self.epss_data.get(vulnerability['cve_id'], 0.5)
# Asset criticality (0-10)
asset_criticality = self.asset_criticality.get(
image_context['service_name'], 5
)
# Exposure factor (0-1)
exposure_factor = self.calculate_exposure_factor(image_context)
# Fix availability factor (0-1)
fix_factor = 0.2 if vulnerability.get('fixed_version') else 0.8
# Age factor (older = higher priority)
age_days = self.calculate_vulnerability_age(vulnerability)
age_factor = min(1.0, age_days / 365)
# Calculate weighted risk score
risk_score = (
(cvss_score * 0.25) +
(epss_score * 10 * 0.25) +
(asset_criticality * 0.20) +
(exposure_factor * 10 * 0.15) +
(fix_factor * 10 * 0.10) +
(age_factor * 10 * 0.05)
)
return round(risk_score, 2)
def calculate_exposure_factor(self, image_context: Dict) -> float:
"""Calculate exposure based on deployment context"""
factors = {
'internet_facing': 1.0,
'internal_only': 0.5,
'development': 0.2,
'air_gapped': 0.1
}
return factors.get(image_context.get('exposure_level', 'internal_only'), 0.5)
def prioritize_vulnerabilities(self) -> List[Dict]:
"""Generate prioritized list of vulnerabilities for remediation"""
prioritized_vulns = []
for image_result in self.scan_results:
image_context = {
'service_name': self.extract_service_name(image_result['image']),
'exposure_level': self.determine_exposure_level(image_result),
'namespace': image_result.get('namespace', 'default')
}
for vuln in image_result.get('vulnerabilities', []):
vuln_priority = {
'vulnerability': vuln,
'image': image_result['image'],
'risk_score': self.calculate_risk_score(vuln, image_context),
'remediation_effort': self.estimate_remediation_effort(vuln, image_result),
'business_context': image_context
}
prioritized_vulns.append(vuln_priority)
# Sort by risk score (descending) and effort (ascending)
prioritized_vulns.sort(
key=lambda x: (x['risk_score'], -x['remediation_effort']),
reverse=True
)
return prioritized_vulns
def estimate_remediation_effort(self, vulnerability: Dict, image_result: Dict) -> int:
"""Estimate effort required for remediation (1-10 scale)"""
effort = 1
# Base image update required
if vulnerability.get('layer_id') == image_result.get('base_layer_id'):
effort += 3
# Breaking changes likely
if self.is_major_version_change(vulnerability):
effort += 3
# Requires code changes
if vulnerability.get('requires_code_change', False):
effort += 2
# Multiple dependencies affected
if len(vulnerability.get('affected_dependencies', [])) > 1:
effort += 1
return min(10, effort)
def generate_remediation_plan(self) -> Dict:
"""Generate actionable remediation plan"""
prioritized = self.prioritize_vulnerabilities()
plan = {
'immediate_actions': [], # Risk score > 8
'short_term': [], # Risk score 5-8
'medium_term': [], # Risk score 3-5
'long_term': [], # Risk score < 3
'accept_risk': [] # Cannot be fixed
}
for vuln in prioritized:
if not vuln['vulnerability'].get('fixed_version'):
plan['accept_risk'].append(self.create_risk_acceptance(vuln))
elif vuln['risk_score'] > 8:
plan['immediate_actions'].append(self.create_remediation_task(vuln))
elif vuln['risk_score'] > 5:
plan['short_term'].append(self.create_remediation_task(vuln))
elif vuln['risk_score'] > 3:
plan['medium_term'].append(self.create_remediation_task(vuln))
else:
plan['long_term'].append(self.create_remediation_task(vuln))
return plan
def create_remediation_task(self, vuln_priority: Dict) -> Dict:
"""Create detailed remediation task"""
vuln = vuln_priority['vulnerability']
return {
'task_id': f"REM-{vuln['cve_id']}-{hash(vuln_priority['image'])}",
'title': f"Fix {vuln['cve_id']} in {vuln_priority['image']}",
'risk_score': vuln_priority['risk_score'],
'effort_estimate': vuln_priority['remediation_effort'],
'steps': self.generate_remediation_steps(vuln, vuln_priority['image']),
'validation': self.generate_validation_steps(vuln),
'rollback_plan': self.generate_rollback_plan(vuln_priority['image'])
}