Implementing Continuous Compliance

Implementing Continuous Compliance

Continuous compliance transforms periodic audits into ongoing validation, leveraging IaC's declarative nature and automation capabilities. Rather than annual assessments discovering months-old problems, continuous compliance identifies issues immediately. This approach reduces both compliance risk and remediation costs while providing real-time compliance posture visibility.

Compliance as Code encodes regulatory requirements into executable policies that automatically validate IaC configurations. These policies run throughout the development lifecycle - during coding, in CI/CD pipelines, and continuously in production. Policy violations trigger immediate alerts and can block non-compliant deployments.

# Continuous Compliance Framework Implementation
import json
import yaml
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum
import boto3

class ComplianceFramework(Enum):
    PCI_DSS = "pci-dss"
    HIPAA = "hipaa"
    SOC2 = "soc2"
    GDPR = "gdpr"
    NIST_800_53 = "nist-800-53"

@dataclass
class ComplianceControl:
    framework: ComplianceFramework
    control_id: str
    description: str
    automated: bool
    validation_func: str
    evidence_required: List[str]

class ContinuousComplianceEngine:
    def __init__(self):
        self.controls = self._load_controls()
        self.validators = {
            'validate_encryption': self._validate_encryption,
            'validate_access_control': self._validate_access_control,
            'validate_logging': self._validate_logging,
            'validate_network_isolation': self._validate_network_isolation,
            'validate_data_retention': self._validate_data_retention
        }
        
    def scan_infrastructure(self, framework: ComplianceFramework) -> Dict[str, Any]:
        """Scan infrastructure for compliance with specified framework."""
        results = {
            'framework': framework.value,
            'scan_time': datetime.utcnow().isoformat(),
            'controls': {},
            'summary': {
                'total': 0,
                'passed': 0,
                'failed': 0,
                'manual': 0
            }
        }
        
        applicable_controls = [c for c in self.controls if c.framework == framework]
        
        for control in applicable_controls:
            results['summary']['total'] += 1
            
            if control.automated:
                result = self._evaluate_control(control)
                results['controls'][control.control_id] = result
                
                if result['status'] == 'PASS':
                    results['summary']['passed'] += 1
                else:
                    results['summary']['failed'] += 1
            else:
                results['controls'][control.control_id] = {
                    'status': 'MANUAL',
                    'description': control.description,
                    'evidence_required': control.evidence_required
                }
                results['summary']['manual'] += 1
                
        results['compliance_score'] = self._calculate_score(results)
        return results
    
    def _validate_encryption(self, resource_type: str, resource_config: Dict) -> bool:
        """Validate encryption requirements across resource types."""
        encryption_requirements = {
            'aws_s3_bucket': lambda c: c.get('server_side_encryption_configuration'),
            'aws_rds_instance': lambda c: c.get('storage_encrypted', False),
            'aws_ebs_volume': lambda c: c.get('encrypted', False),
            'aws_dynamodb_table': lambda c: c.get('server_side_encryption', {}).get('enabled', False)
        }
        
        validator = encryption_requirements.get(resource_type)
        return validator(resource_config) if validator else True
    
    def _validate_logging(self, resource_type: str, resource_config: Dict) -> bool:
        """Validate logging requirements."""
        logging_requirements = {
            'aws_s3_bucket': lambda c: 'logging_configuration' in c,
            'aws_rds_instance': lambda c: bool(c.get('enabled_cloudwatch_logs_exports')),
            'aws_lb': lambda c: 'access_logs' in c and c['access_logs'].get('enabled', False),
            'aws_api_gateway_stage': lambda c: 'access_log_settings' in c
        }
        
        validator = logging_requirements.get(resource_type)
        return validator(resource_config) if validator else True
    
    def generate_compliance_report(self, scan_results: Dict[str, Any]) -> str:
        """Generate detailed compliance report from scan results."""
        report = f"""
# Compliance Scan Report

**Framework**: {scan_results['framework']}  
**Scan Date**: {scan_results['scan_time']}  
**Compliance Score**: {scan_results['compliance_score']}%