Automated Compliance Validation

Automated Compliance Validation

Compliance requirements often translate poorly to automated checks, but modern tools can validate many compliance controls automatically. Configuration standards, security headers, encryption requirements, and access controls can all be verified through automated testing. The challenge lies in translating regulatory language into technical validations.

#!/usr/bin/env python3
# compliance_validator.py - Automated compliance checking

import json
import yaml
import re
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum

class ComplianceFramework(Enum):
    PCI_DSS = "pci_dss"
    HIPAA = "hipaa"
    SOC2 = "soc2"
    GDPR = "gdpr"
    NIST_800_53 = "nist_800_53"

@dataclass
class ComplianceControl:
    id: str
    description: str
    framework: ComplianceFramework
    automated: bool
    validation_function: str
    evidence_required: List[str]

class ComplianceValidator:
    def __init__(self, framework: ComplianceFramework):
        self.framework = framework
        self.controls = self._load_controls()
        self.evidence_collector = EvidenceCollector()
        
    def validate_compliance(self, target_config: Dict) -> Dict[str, Any]:
        """Validate compliance against framework requirements"""
        results = {
            'framework': self.framework.value,
            'timestamp': datetime.utcnow().isoformat(),
            'controls': {},
            'summary': {
                'total': 0,
                'passed': 0,
                'failed': 0,
                'not_applicable': 0,
                'manual_review': 0
            }
        }
        
        for control in self.controls:
            if control.framework != self.framework:
                continue
                
            results['summary']['total'] += 1
            
            if control.automated:
                result = self._validate_control(control, target_config)
                results['controls'][control.id] = result
                
                if result['status'] == 'passed':
                    results['summary']['passed'] += 1
                elif result['status'] == 'failed':
                    results['summary']['failed'] += 1
                elif result['status'] == 'not_applicable':
                    results['summary']['not_applicable'] += 1
            else:
                results['controls'][control.id] = {
                    'status': 'manual_review',
                    'description': control.description,
                    'evidence_required': control.evidence_required
                }
                results['summary']['manual_review'] += 1
        
        results['compliance_score'] = self._calculate_compliance_score(results)
        return results
    
    def _validate_control(self, control: ComplianceControl, config: Dict) -> Dict:
        """Validate individual control"""
        validator_func = getattr(self, control.validation_function, None)
        
        if not validator_func:
            return {
                'status': 'error',
                'message': f'Validation function {control.validation_function} not found'
            }
        
        try:
            result = validator_func(config)
            evidence = self.evidence_collector.collect(control.id, config)
            
            return {
                'status': 'passed' if result else 'failed',
                'description': control.description,
                'evidence': evidence,
                'details': result.get('details', {}) if isinstance(result, dict) else {}
            }
        except Exception as e:
            return {
                'status': 'error',
                'message': str(e)
            }
    
    # PCI DSS specific validations
    def validate_pci_encryption_at_rest(self, config: Dict) -> bool:
        """PCI DSS 3.4 - Encryption at rest for cardholder data"""
        storage_configs = config.get('storage', {})
        
        for storage in storage_configs.values():
            if storage.get('type') == 'database':
                if not storage.get('encryption', {}).get('at_rest'):
                    return False
                if storage.get('encryption', {}).get('algorithm') not in ['AES-256', 'AES-128']:
                    return False
            
            if storage.get('type') == 'file_storage':
                if not storage.get('encryption', {}).get('enabled'):
                    return False
        
        return True
    
    def validate_pci_key_management(self, config: Dict) -> Dict:
        """PCI DSS 3.5 & 3.6 - Key management procedures"""
        key_mgmt = config.get('key_management', {})
        
        requirements = {
            'key_rotation': key_mgmt.get('rotation_period_days', 0) <= 365,
            'key_storage': key_mgmt.get('storage_type') in ['hsm', 'key_vault'],
            'split_knowledge': key_mgmt.get('split_knowledge_enabled', False),
            'dual_control': key_mgmt.get('dual_control_enabled', False),
            'key_strength': key_mgmt.get('minimum_key_strength', 0) >= 2048
        }
        
        return {
            'passed': all(requirements.values()),
            'details': requirements
        }
    
    # HIPAA specific validations
    def validate_hipaa_access_controls(self, config: Dict) -> Dict:
        """HIPAA 164.312(a) - Access control requirements"""
        access_control = config.get('access_control', {})
        
        requirements = {
            'unique_user_identification': access_control.get('unique_users', False),
            'automatic_logoff': access_control.get('session_timeout_minutes', 0) <= 15,
            'encryption_decryption': access_control.get('data_encryption', False)
        }
        
        return {
            'passed': all(requirements.values()),
            'details': requirements
        }
    
    def validate_hipaa_audit_controls(self, config: Dict) -> bool:
        """HIPAA 164.312(b) - Audit controls"""
        audit_config = config.get('audit', {})
        
        required_events = [
            'authentication_attempts',
            'authorization_failures',
            'data_access',
            'data_modifications',
            'administrative_actions'
        ]
        
        logged_events = audit_config.get('logged_events', [])
        
        return all(event in logged_events for event in required_events)

# Policy as Code for compliance
class CompliancePolicyEngine:
    def __init__(self, policies_dir: str):
        self.policies = self._load_policies(policies_dir)
        
    def evaluate_deployment(self, deployment_config: Dict) -> Dict:
        """Evaluate deployment against compliance policies"""
        results = {
            'compliant': True,
            'violations': [],
            'warnings': []
        }
        
        for policy in self.policies:
            evaluation = self._evaluate_policy(policy, deployment_config)
            
            if not evaluation['compliant']:
                results['compliant'] = False
                results['violations'].extend(evaluation['violations'])
            
            results['warnings'].extend(evaluation.get('warnings', []))
        
        return results
    
    def generate_compliance_report(self, evaluation_results: Dict) -> str:
        """Generate human-readable compliance report"""
        report = []
        report.append("# Compliance Evaluation Report")
        report.append(f"Generated: {datetime.utcnow().isoformat()}")
        report.append("")
        
        if evaluation_results['compliant']:
            report.append("✅ **Status: COMPLIANT**")
        else:
            report.append("❌ **Status: NON-COMPLIANT**")
        
        if evaluation_results['violations']:
            report.append("\n## Violations")
            for violation in evaluation_results['violations']:
                report.append(f"- **{violation['control']}**: {violation['description']}")
                report.append(f"  - Severity: {violation['severity']}")
                report.append(f"  - Remediation: {violation['remediation']}")
        
        if evaluation_results['warnings']:
            report.append("\n## Warnings")
            for warning in evaluation_results['warnings']:
                report.append(f"- **{warning['control']}**: {warning['description']}")
        
        return "\n".join(report)