Building Automated Compliance Scanning Workflows

Building Automated Compliance Scanning Workflows

Automated compliance scanning ensures consistent security assessment across all container images:

#!/usr/bin/env python3
# compliance-scanner.py

import json
import subprocess
from datetime import datetime
from typing import Dict, List
import hashlib
import os

class ComplianceScanner:
    def __init__(self, compliance_frameworks: List[str]):
        self.frameworks = compliance_frameworks
        self.scan_results = {}
        self.compliance_policies = self.load_compliance_policies()
        
    def load_compliance_policies(self) -> Dict:
        """Load compliance policies for different frameworks"""
        return {
            'pci_dss': {
                'max_critical_vulns': 0,
                'max_high_vulns': 0,
                'remediation_sla_days': {
                    'critical': 1,
                    'high': 7,
                    'medium': 30
                },
                'required_scans_per_month': 4,
                'require_signed_images': True
            },
            'hipaa': {
                'max_critical_vulns': 0,
                'max_high_vulns': 5,
                'require_encryption': True,
                'require_access_logs': True,
                'audit_retention_days': 2190  # 6 years
            },
            'soc2': {
                'continuous_monitoring': True,
                'change_detection': True,
                'vulnerability_tracking': True,
                'evidence_collection': True
            },
            'cis_docker': {
                'benchmark_version': '1.4.0',
                'required_checks': [
                    'no_root_user',
                    'user_namespace_enabled',
                    'default_seccomp_profile',
                    'apparmor_enabled',
                    'no_privileged_containers'
                ]
            }
        }
    
    def scan_image_compliance(self, image_name: str) -> Dict:
        """Comprehensive compliance scan for an image"""
        scan_id = hashlib.sha256(f"{image_name}{datetime.now()}".encode()).hexdigest()[:12]
        
        results = {
            'scan_id': scan_id,
            'image': image_name,
            'scan_timestamp': datetime.now().isoformat(),
            'compliance_status': {},
            'vulnerabilities': self.scan_vulnerabilities(image_name),
            'configuration': self.scan_configuration(image_name),
            'evidence': {}
        }
        
        # Check compliance for each framework
        for framework in self.frameworks:
            results['compliance_status'][framework] = self.check_framework_compliance(
                framework, results
            )
            
        # Generate evidence artifacts
        results['evidence'] = self.generate_evidence(results)
        
        return results
    
    def scan_vulnerabilities(self, image_name: str) -> Dict:
        """Scan for vulnerabilities using Trivy"""
        cmd = ['trivy', 'image', '--format', 'json', '--quiet', image_name]
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        trivy_output = json.loads(result.stdout)
        
        # Parse and categorize vulnerabilities
        vuln_summary = {
            'total': 0,
            'critical': 0,
            'high': 0,
            'medium': 0,
            'low': 0,
            'details': []
        }
        
        for result in trivy_output.get('Results', []):
            for vuln in result.get('Vulnerabilities', []):
                vuln_summary['total'] += 1
                vuln_summary[vuln['Severity'].lower()] += 1
                
                vuln_summary['details'].append({
                    'id': vuln['VulnerabilityID'],
                    'package': vuln['PkgName'],
                    'severity': vuln['Severity'],
                    'fixed_version': vuln.get('FixedVersion'),
                    'published_date': vuln.get('PublishedDate'),
                    'layer': result.get('Target')
                })
                
        return vuln_summary
    
    def scan_configuration(self, image_name: str) -> Dict:
        """Scan image configuration for compliance"""
        # Get image configuration
        inspect_cmd = ['docker', 'inspect', image_name]
        result = subprocess.run(inspect_cmd, capture_output=True, text=True)
        image_config = json.loads(result.stdout)[0]
        
        config_checks = {
            'runs_as_root': self.check_root_user(image_config),
            'has_healthcheck': 'Healthcheck' in image_config['Config'],
            'uses_privileged': False,  # Would need runtime info
            'exposed_ports': list(image_config['Config'].get('ExposedPorts', {}).keys()),
            'environment_vars': self.check_sensitive_env_vars(image_config),
            'user_defined': image_config['Config'].get('User', '') != '',
            'working_dir_defined': image_config['Config'].get('WorkingDir', '') != ''
        }
        
        return config_checks
    
    def check_framework_compliance(self, framework: str, scan_results: Dict) -> Dict:
        """Check compliance against specific framework"""
        policy = self.compliance_policies.get(framework, {})
        
        compliance_result = {
            'compliant': True,
            'violations': [],
            'warnings': [],
            'framework_version': policy.get('version', 'latest')
        }
        
        if framework == 'pci_dss':
            # Check vulnerability counts
            if scan_results['vulnerabilities']['critical'] > policy['max_critical_vulns']:
                compliance_result['compliant'] = False
                compliance_result['violations'].append({
                    'rule': 'vulnerability_count',
                    'severity': 'critical',
                    'message': f"Found {scan_results['vulnerabilities']['critical']} critical vulnerabilities, max allowed: {policy['max_critical_vulns']}"
                })
                
            # Check image signing (mock check)
            if policy['require_signed_images'] and not self.check_image_signature(scan_results['image']):
                compliance_result['compliant'] = False
                compliance_result['violations'].append({
                    'rule': 'image_signing',
                    'severity': 'high',
                    'message': 'Image is not signed'
                })
                
        elif framework == 'cis_docker':
            # Check CIS Docker Benchmark items
            config = scan_results['configuration']
            
            if config['runs_as_root']:
                compliance_result['compliant'] = False
                compliance_result['violations'].append({
                    'rule': 'cis_5_2',
                    'severity': 'medium',
                    'message': 'Container runs as root user'
                })
                
            if not config['user_defined']:
                compliance_result['warnings'].append({
                    'rule': 'cis_5_3',
                    'severity': 'low',
                    'message': 'No user defined in Dockerfile'
                })
                
        return compliance_result
    
    def generate_evidence(self, scan_results: Dict) -> Dict:
        """Generate compliance evidence artifacts"""
        evidence_dir = f"./compliance-evidence/{scan_results['scan_id']}"
        os.makedirs(evidence_dir, exist_ok=True)
        
        # Save scan results
        with open(f"{evidence_dir}/scan-results.json", 'w') as f:
            json.dump(scan_results, f, indent=2)
            
        # Generate summary report
        summary = self.generate_summary_report(scan_results)
        with open(f"{evidence_dir}/summary-report.txt", 'w') as f:
            f.write(summary)
            
        # Create SBOM
        sbom_cmd = [
            'trivy', 'image', 
            '--format', 'cyclonedx',
            '--output', f"{evidence_dir}/sbom.xml",
            scan_results['image']
        ]
        subprocess.run(sbom_cmd, capture_output=True)
        
        return {
            'scan_results': f"{evidence_dir}/scan-results.json",
            'summary_report': f"{evidence_dir}/summary-report.txt",
            'sbom': f"{evidence_dir}/sbom.xml",
            'attestation': self.generate_attestation(scan_results)
        }
    
    def generate_summary_report(self, scan_results: Dict) -> str:
        """Generate human-readable compliance summary"""
        report = f"""
Container Security Compliance Report
====================================
Scan ID: {scan_results['scan_id']}
Image: {scan_results['image']}
Scan Date: {scan_results['scan_timestamp']}

Vulnerability Summary:
- Critical: {scan_results['vulnerabilities']['critical']}
- High: {scan_results['vulnerabilities']['high']}
- Medium: {scan_results['vulnerabilities']['medium']}
- Low: {scan_results['vulnerabilities']['low']}
- Total: {scan_results['vulnerabilities']['total']}

Compliance Status:
"""
        
        for framework, status in scan_results['compliance_status'].items():
            report += f"\n{framework.upper()}:\n"
            report += f"  Status: {'COMPLIANT' if status['compliant'] else 'NON-COMPLIANT'}\n"
            
            if status['violations']:
                report += "  Violations:\n"
                for violation in status['violations']:
                    report += f"    - [{violation['severity']}] {violation['message']}\n"
                    
            if status['warnings']:
                report += "  Warnings:\n"
                for warning in status['warnings']:
                    report += f"    - [{warning['severity']}] {warning['message']}\n"
                    
        return report

# Usage example
scanner = ComplianceScanner(['pci_dss', 'cis_docker', 'hipaa'])
results = scanner.scan_image_compliance('myapp:latest')
print(json.dumps(results['compliance_status'], indent=2))