Automated Compliance Validation
Automated Compliance Validation
Compliance requirements often translate poorly to automated checks, but modern tools can validate many compliance controls automatically. Configuration standards, security headers, encryption requirements, and access controls can all be verified through automated testing. The challenge lies in translating regulatory language into technical validations.
#!/usr/bin/env python3
# compliance_validator.py - Automated compliance checking
import json
import yaml
import re
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum
class ComplianceFramework(Enum):
PCI_DSS = "pci_dss"
HIPAA = "hipaa"
SOC2 = "soc2"
GDPR = "gdpr"
NIST_800_53 = "nist_800_53"
@dataclass
class ComplianceControl:
id: str
description: str
framework: ComplianceFramework
automated: bool
validation_function: str
evidence_required: List[str]
class ComplianceValidator:
def __init__(self, framework: ComplianceFramework):
self.framework = framework
self.controls = self._load_controls()
self.evidence_collector = EvidenceCollector()
def validate_compliance(self, target_config: Dict) -> Dict[str, Any]:
"""Validate compliance against framework requirements"""
results = {
'framework': self.framework.value,
'timestamp': datetime.utcnow().isoformat(),
'controls': {},
'summary': {
'total': 0,
'passed': 0,
'failed': 0,
'not_applicable': 0,
'manual_review': 0
}
}
for control in self.controls:
if control.framework != self.framework:
continue
results['summary']['total'] += 1
if control.automated:
result = self._validate_control(control, target_config)
results['controls'][control.id] = result
if result['status'] == 'passed':
results['summary']['passed'] += 1
elif result['status'] == 'failed':
results['summary']['failed'] += 1
elif result['status'] == 'not_applicable':
results['summary']['not_applicable'] += 1
else:
results['controls'][control.id] = {
'status': 'manual_review',
'description': control.description,
'evidence_required': control.evidence_required
}
results['summary']['manual_review'] += 1
results['compliance_score'] = self._calculate_compliance_score(results)
return results
def _validate_control(self, control: ComplianceControl, config: Dict) -> Dict:
"""Validate individual control"""
validator_func = getattr(self, control.validation_function, None)
if not validator_func:
return {
'status': 'error',
'message': f'Validation function {control.validation_function} not found'
}
try:
result = validator_func(config)
evidence = self.evidence_collector.collect(control.id, config)
return {
'status': 'passed' if result else 'failed',
'description': control.description,
'evidence': evidence,
'details': result.get('details', {}) if isinstance(result, dict) else {}
}
except Exception as e:
return {
'status': 'error',
'message': str(e)
}
# PCI DSS specific validations
def validate_pci_encryption_at_rest(self, config: Dict) -> bool:
"""PCI DSS 3.4 - Encryption at rest for cardholder data"""
storage_configs = config.get('storage', {})
for storage in storage_configs.values():
if storage.get('type') == 'database':
if not storage.get('encryption', {}).get('at_rest'):
return False
if storage.get('encryption', {}).get('algorithm') not in ['AES-256', 'AES-128']:
return False
if storage.get('type') == 'file_storage':
if not storage.get('encryption', {}).get('enabled'):
return False
return True
def validate_pci_key_management(self, config: Dict) -> Dict:
"""PCI DSS 3.5 & 3.6 - Key management procedures"""
key_mgmt = config.get('key_management', {})
requirements = {
'key_rotation': key_mgmt.get('rotation_period_days', 0) <= 365,
'key_storage': key_mgmt.get('storage_type') in ['hsm', 'key_vault'],
'split_knowledge': key_mgmt.get('split_knowledge_enabled', False),
'dual_control': key_mgmt.get('dual_control_enabled', False),
'key_strength': key_mgmt.get('minimum_key_strength', 0) >= 2048
}
return {
'passed': all(requirements.values()),
'details': requirements
}
# HIPAA specific validations
def validate_hipaa_access_controls(self, config: Dict) -> Dict:
"""HIPAA 164.312(a) - Access control requirements"""
access_control = config.get('access_control', {})
requirements = {
'unique_user_identification': access_control.get('unique_users', False),
'automatic_logoff': access_control.get('session_timeout_minutes', 0) <= 15,
'encryption_decryption': access_control.get('data_encryption', False)
}
return {
'passed': all(requirements.values()),
'details': requirements
}
def validate_hipaa_audit_controls(self, config: Dict) -> bool:
"""HIPAA 164.312(b) - Audit controls"""
audit_config = config.get('audit', {})
required_events = [
'authentication_attempts',
'authorization_failures',
'data_access',
'data_modifications',
'administrative_actions'
]
logged_events = audit_config.get('logged_events', [])
return all(event in logged_events for event in required_events)
# Policy as Code for compliance
class CompliancePolicyEngine:
def __init__(self, policies_dir: str):
self.policies = self._load_policies(policies_dir)
def evaluate_deployment(self, deployment_config: Dict) -> Dict:
"""Evaluate deployment against compliance policies"""
results = {
'compliant': True,
'violations': [],
'warnings': []
}
for policy in self.policies:
evaluation = self._evaluate_policy(policy, deployment_config)
if not evaluation['compliant']:
results['compliant'] = False
results['violations'].extend(evaluation['violations'])
results['warnings'].extend(evaluation.get('warnings', []))
return results
def generate_compliance_report(self, evaluation_results: Dict) -> str:
"""Generate human-readable compliance report"""
report = []
report.append("# Compliance Evaluation Report")
report.append(f"Generated: {datetime.utcnow().isoformat()}")
report.append("")
if evaluation_results['compliant']:
report.append("✅ **Status: COMPLIANT**")
else:
report.append("❌ **Status: NON-COMPLIANT**")
if evaluation_results['violations']:
report.append("\n## Violations")
for violation in evaluation_results['violations']:
report.append(f"- **{violation['control']}**: {violation['description']}")
report.append(f" - Severity: {violation['severity']}")
report.append(f" - Remediation: {violation['remediation']}")
if evaluation_results['warnings']:
report.append("\n## Warnings")
for warning in evaluation_results['warnings']:
report.append(f"- **{warning['control']}**: {warning['description']}")
return "\n".join(report)