Building Automated Compliance Scanning Workflows
Building Automated Compliance Scanning Workflows
Automated compliance scanning ensures consistent security assessment across all container images:
#!/usr/bin/env python3
# compliance-scanner.py
import json
import subprocess
from datetime import datetime
from typing import Dict, List
import hashlib
import os
class ComplianceScanner:
def __init__(self, compliance_frameworks: List[str]):
self.frameworks = compliance_frameworks
self.scan_results = {}
self.compliance_policies = self.load_compliance_policies()
def load_compliance_policies(self) -> Dict:
"""Load compliance policies for different frameworks"""
return {
'pci_dss': {
'max_critical_vulns': 0,
'max_high_vulns': 0,
'remediation_sla_days': {
'critical': 1,
'high': 7,
'medium': 30
},
'required_scans_per_month': 4,
'require_signed_images': True
},
'hipaa': {
'max_critical_vulns': 0,
'max_high_vulns': 5,
'require_encryption': True,
'require_access_logs': True,
'audit_retention_days': 2190 # 6 years
},
'soc2': {
'continuous_monitoring': True,
'change_detection': True,
'vulnerability_tracking': True,
'evidence_collection': True
},
'cis_docker': {
'benchmark_version': '1.4.0',
'required_checks': [
'no_root_user',
'user_namespace_enabled',
'default_seccomp_profile',
'apparmor_enabled',
'no_privileged_containers'
]
}
}
def scan_image_compliance(self, image_name: str) -> Dict:
"""Comprehensive compliance scan for an image"""
scan_id = hashlib.sha256(f"{image_name}{datetime.now()}".encode()).hexdigest()[:12]
results = {
'scan_id': scan_id,
'image': image_name,
'scan_timestamp': datetime.now().isoformat(),
'compliance_status': {},
'vulnerabilities': self.scan_vulnerabilities(image_name),
'configuration': self.scan_configuration(image_name),
'evidence': {}
}
# Check compliance for each framework
for framework in self.frameworks:
results['compliance_status'][framework] = self.check_framework_compliance(
framework, results
)
# Generate evidence artifacts
results['evidence'] = self.generate_evidence(results)
return results
def scan_vulnerabilities(self, image_name: str) -> Dict:
"""Scan for vulnerabilities using Trivy"""
cmd = ['trivy', 'image', '--format', 'json', '--quiet', image_name]
result = subprocess.run(cmd, capture_output=True, text=True)
trivy_output = json.loads(result.stdout)
# Parse and categorize vulnerabilities
vuln_summary = {
'total': 0,
'critical': 0,
'high': 0,
'medium': 0,
'low': 0,
'details': []
}
for result in trivy_output.get('Results', []):
for vuln in result.get('Vulnerabilities', []):
vuln_summary['total'] += 1
vuln_summary[vuln['Severity'].lower()] += 1
vuln_summary['details'].append({
'id': vuln['VulnerabilityID'],
'package': vuln['PkgName'],
'severity': vuln['Severity'],
'fixed_version': vuln.get('FixedVersion'),
'published_date': vuln.get('PublishedDate'),
'layer': result.get('Target')
})
return vuln_summary
def scan_configuration(self, image_name: str) -> Dict:
"""Scan image configuration for compliance"""
# Get image configuration
inspect_cmd = ['docker', 'inspect', image_name]
result = subprocess.run(inspect_cmd, capture_output=True, text=True)
image_config = json.loads(result.stdout)[0]
config_checks = {
'runs_as_root': self.check_root_user(image_config),
'has_healthcheck': 'Healthcheck' in image_config['Config'],
'uses_privileged': False, # Would need runtime info
'exposed_ports': list(image_config['Config'].get('ExposedPorts', {}).keys()),
'environment_vars': self.check_sensitive_env_vars(image_config),
'user_defined': image_config['Config'].get('User', '') != '',
'working_dir_defined': image_config['Config'].get('WorkingDir', '') != ''
}
return config_checks
def check_framework_compliance(self, framework: str, scan_results: Dict) -> Dict:
"""Check compliance against specific framework"""
policy = self.compliance_policies.get(framework, {})
compliance_result = {
'compliant': True,
'violations': [],
'warnings': [],
'framework_version': policy.get('version', 'latest')
}
if framework == 'pci_dss':
# Check vulnerability counts
if scan_results['vulnerabilities']['critical'] > policy['max_critical_vulns']:
compliance_result['compliant'] = False
compliance_result['violations'].append({
'rule': 'vulnerability_count',
'severity': 'critical',
'message': f"Found {scan_results['vulnerabilities']['critical']} critical vulnerabilities, max allowed: {policy['max_critical_vulns']}"
})
# Check image signing (mock check)
if policy['require_signed_images'] and not self.check_image_signature(scan_results['image']):
compliance_result['compliant'] = False
compliance_result['violations'].append({
'rule': 'image_signing',
'severity': 'high',
'message': 'Image is not signed'
})
elif framework == 'cis_docker':
# Check CIS Docker Benchmark items
config = scan_results['configuration']
if config['runs_as_root']:
compliance_result['compliant'] = False
compliance_result['violations'].append({
'rule': 'cis_5_2',
'severity': 'medium',
'message': 'Container runs as root user'
})
if not config['user_defined']:
compliance_result['warnings'].append({
'rule': 'cis_5_3',
'severity': 'low',
'message': 'No user defined in Dockerfile'
})
return compliance_result
def generate_evidence(self, scan_results: Dict) -> Dict:
"""Generate compliance evidence artifacts"""
evidence_dir = f"./compliance-evidence/{scan_results['scan_id']}"
os.makedirs(evidence_dir, exist_ok=True)
# Save scan results
with open(f"{evidence_dir}/scan-results.json", 'w') as f:
json.dump(scan_results, f, indent=2)
# Generate summary report
summary = self.generate_summary_report(scan_results)
with open(f"{evidence_dir}/summary-report.txt", 'w') as f:
f.write(summary)
# Create SBOM
sbom_cmd = [
'trivy', 'image',
'--format', 'cyclonedx',
'--output', f"{evidence_dir}/sbom.xml",
scan_results['image']
]
subprocess.run(sbom_cmd, capture_output=True)
return {
'scan_results': f"{evidence_dir}/scan-results.json",
'summary_report': f"{evidence_dir}/summary-report.txt",
'sbom': f"{evidence_dir}/sbom.xml",
'attestation': self.generate_attestation(scan_results)
}
def generate_summary_report(self, scan_results: Dict) -> str:
"""Generate human-readable compliance summary"""
report = f"""
Container Security Compliance Report
====================================
Scan ID: {scan_results['scan_id']}
Image: {scan_results['image']}
Scan Date: {scan_results['scan_timestamp']}
Vulnerability Summary:
- Critical: {scan_results['vulnerabilities']['critical']}
- High: {scan_results['vulnerabilities']['high']}
- Medium: {scan_results['vulnerabilities']['medium']}
- Low: {scan_results['vulnerabilities']['low']}
- Total: {scan_results['vulnerabilities']['total']}
Compliance Status:
"""
for framework, status in scan_results['compliance_status'].items():
report += f"\n{framework.upper()}:\n"
report += f" Status: {'COMPLIANT' if status['compliant'] else 'NON-COMPLIANT'}\n"
if status['violations']:
report += " Violations:\n"
for violation in status['violations']:
report += f" - [{violation['severity']}] {violation['message']}\n"
if status['warnings']:
report += " Warnings:\n"
for warning in status['warnings']:
report += f" - [{warning['severity']}] {warning['message']}\n"
return report
# Usage example
scanner = ComplianceScanner(['pci_dss', 'cis_docker', 'hipaa'])
results = scanner.scan_image_compliance('myapp:latest')
print(json.dumps(results['compliance_status'], indent=2))