Implementing Continuous Compliance
Implementing Continuous Compliance
Continuous compliance transforms periodic audits into ongoing validation, leveraging IaC's declarative nature and automation capabilities. Rather than annual assessments discovering months-old problems, continuous compliance identifies issues immediately. This approach reduces both compliance risk and remediation costs while providing real-time compliance posture visibility.
Compliance as Code encodes regulatory requirements into executable policies that automatically validate IaC configurations. These policies run throughout the development lifecycle - during coding, in CI/CD pipelines, and continuously in production. Policy violations trigger immediate alerts and can block non-compliant deployments.
# Continuous Compliance Framework Implementation
import json
import yaml
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum
import boto3
class ComplianceFramework(Enum):
PCI_DSS = "pci-dss"
HIPAA = "hipaa"
SOC2 = "soc2"
GDPR = "gdpr"
NIST_800_53 = "nist-800-53"
@dataclass
class ComplianceControl:
framework: ComplianceFramework
control_id: str
description: str
automated: bool
validation_func: str
evidence_required: List[str]
class ContinuousComplianceEngine:
def __init__(self):
self.controls = self._load_controls()
self.validators = {
'validate_encryption': self._validate_encryption,
'validate_access_control': self._validate_access_control,
'validate_logging': self._validate_logging,
'validate_network_isolation': self._validate_network_isolation,
'validate_data_retention': self._validate_data_retention
}
def scan_infrastructure(self, framework: ComplianceFramework) -> Dict[str, Any]:
"""Scan infrastructure for compliance with specified framework."""
results = {
'framework': framework.value,
'scan_time': datetime.utcnow().isoformat(),
'controls': {},
'summary': {
'total': 0,
'passed': 0,
'failed': 0,
'manual': 0
}
}
applicable_controls = [c for c in self.controls if c.framework == framework]
for control in applicable_controls:
results['summary']['total'] += 1
if control.automated:
result = self._evaluate_control(control)
results['controls'][control.control_id] = result
if result['status'] == 'PASS':
results['summary']['passed'] += 1
else:
results['summary']['failed'] += 1
else:
results['controls'][control.control_id] = {
'status': 'MANUAL',
'description': control.description,
'evidence_required': control.evidence_required
}
results['summary']['manual'] += 1
results['compliance_score'] = self._calculate_score(results)
return results
def _validate_encryption(self, resource_type: str, resource_config: Dict) -> bool:
"""Validate encryption requirements across resource types."""
encryption_requirements = {
'aws_s3_bucket': lambda c: c.get('server_side_encryption_configuration'),
'aws_rds_instance': lambda c: c.get('storage_encrypted', False),
'aws_ebs_volume': lambda c: c.get('encrypted', False),
'aws_dynamodb_table': lambda c: c.get('server_side_encryption', {}).get('enabled', False)
}
validator = encryption_requirements.get(resource_type)
return validator(resource_config) if validator else True
def _validate_logging(self, resource_type: str, resource_config: Dict) -> bool:
"""Validate logging requirements."""
logging_requirements = {
'aws_s3_bucket': lambda c: 'logging_configuration' in c,
'aws_rds_instance': lambda c: bool(c.get('enabled_cloudwatch_logs_exports')),
'aws_lb': lambda c: 'access_logs' in c and c['access_logs'].get('enabled', False),
'aws_api_gateway_stage': lambda c: 'access_log_settings' in c
}
validator = logging_requirements.get(resource_type)
return validator(resource_config) if validator else True
def generate_compliance_report(self, scan_results: Dict[str, Any]) -> str:
"""Generate detailed compliance report from scan results."""
report = f"""
# Compliance Scan Report
**Framework**: {scan_results['framework']}
**Scan Date**: {scan_results['scan_time']}
**Compliance Score**: {scan_results['compliance_score']}%