Infrastructure as Code Security

Infrastructure as Code Security

Infrastructure as Code (IaC) enables version-controlled, repeatable infrastructure deployment. For containers, this includes Kubernetes manifests, Helm charts, and Docker Compose files. IaC security scanning prevents misconfiguration vulnerabilities before deployment. Security policies can be enforced consistently across all environments through IaC scanning.

IaC security tools examine configurations for security best practices and compliance requirements. They can detect overly permissive security policies, missing encryption settings, and exposed secrets. Integration with version control systems enables security scanning during code review. This early detection prevents security misconfigurations from reaching production.

# Example: IaC security scanner for Kubernetes manifests
import yaml
import json
import sys
from pathlib import Path
from typing import List, Dict, Any

class K8sSecurityScanner:
    def __init__(self, policy_file: str):
        with open(policy_file, 'r') as f:
            self.policies = yaml.safe_load(f)
        self.violations = []
        
    def scan_manifest(self, manifest_path: str) -> List[Dict[str, Any]]:
        """Scan Kubernetes manifest for security issues"""
        with open(manifest_path, 'r') as f:
            # Handle multi-document YAML
            documents = yaml.safe_load_all(f)
            
        for doc in documents:
            if not doc:
                continue
                
            kind = doc.get('kind', '')
            
            # Route to appropriate scanner
            if kind == 'Pod':
                self._scan_pod(doc)
            elif kind == 'Deployment':
                self._scan_deployment(doc)
            elif kind == 'Service':
                self._scan_service(doc)
            elif kind == 'NetworkPolicy':
                self._scan_network_policy(doc)
            elif kind in ['Role', 'ClusterRole']:
                self._scan_rbac(doc)
                
        return self.violations
    
    def _scan_pod(self, pod: Dict[str, Any]):
        """Security scanning for Pod specifications"""
        metadata = pod.get('metadata', {})
        spec = pod.get('spec', {})
        
        # Check for security context
        if 'securityContext' not in spec:
            self._add_violation(
                'POD001',
                'Pod missing securityContext',
                metadata.get('name', 'unknown'),
                'high'
            )
        
        # Scan containers
        for container in spec.get('containers', []):
            self._scan_container(container, metadata.get('name'))
    
    def _scan_container(self, container: Dict[str, Any], parent_name: str):
        """Security scanning for container specifications"""
        name = container.get('name', 'unknown')
        
        # Check image source
        image = container.get('image', '')
        if ':latest' in image or ':' not in image:
            self._add_violation(
                'IMG001',
                'Container using latest or untagged image',
                f"{parent_name}/{name}",
                'medium'
            )
        
        # Check for registry whitelist
        if not any(registry in image for registry in self.policies['allowed_registries']):
            self._add_violation(
                'IMG002',
                'Container using non-whitelisted registry',
                f"{parent_name}/{name}",
                'high'
            )
        
        # Security context checks
        security_context = container.get('securityContext', {})
        
        if not security_context.get('runAsNonRoot', False):
            self._add_violation(
                'SEC001',
                'Container not configured to run as non-root',
                f"{parent_name}/{name}",
                'high'
            )
        
        if security_context.get('privileged', False):
            self._add_violation(
                'SEC002',
                'Container running in privileged mode',
                f"{parent_name}/{name}",
                'critical'
            )
        
        if not security_context.get('readOnlyRootFilesystem', False):
            self._add_violation(
                'SEC003',
                'Container filesystem not read-only',
                f"{parent_name}/{name}",
                'medium'
            )
        
        # Capabilities check
        capabilities = security_context.get('capabilities', {})
        if 'add' in capabilities:
            for cap in capabilities['add']:
                if cap in self.policies['dangerous_capabilities']:
                    self._add_violation(
                        'CAP001',
                        f'Container adding dangerous capability: {cap}',
                        f"{parent_name}/{name}",
                        'high'
                    )
        
        # Resource limits
        if 'resources' not in container or 'limits' not in container['resources']:
            self._add_violation(
                'RES001',
                'Container missing resource limits',
                f"{parent_name}/{name}",
                'medium'
            )
        
        # Environment variables check for secrets
        for env in container.get('env', []):
            if any(secret_pattern in env.get('name', '').lower() 
                   for secret_pattern in ['password', 'key', 'token', 'secret']):
                if 'value' in env:  # Direct value instead of secretRef
                    self._add_violation(
                        'SEC004',
                        'Potential secret in environment variable',
                        f"{parent_name}/{name}/{env['name']}",
                        'critical'
                    )
    
    def _scan_network_policy(self, policy: Dict[str, Any]):
        """Check for network segmentation"""
        spec = policy.get('spec', {})
        
        # Check if policy is too permissive
        if not spec.get('ingress') and not spec.get('egress'):
            self._add_violation(
                'NET001',
                'NetworkPolicy with no rules (allows all traffic)',
                policy.get('metadata', {}).get('name', 'unknown'),
                'high'
            )
    
    def _scan_rbac(self, role: Dict[str, Any]):
        """Check RBAC configurations"""
        rules = role.get('rules', [])
        
        for rule in rules:
            # Check for wildcard permissions
            if '*' in rule.get('verbs', []):
                self._add_violation(
                    'RBAC001',
                    'RBAC rule with wildcard verbs',
                    role.get('metadata', {}).get('name', 'unknown'),
                    'high'
                )
            
            if '*' in rule.get('resources', []):
                self._add_violation(
                    'RBAC002',
                    'RBAC rule with wildcard resources',
                    role.get('metadata', {}).get('name', 'unknown'),
                    'high'
                )
    
    def _add_violation(self, code: str, message: str, resource: str, severity: str):
        """Add security violation"""
        self.violations.append({
            'code': code,
            'message': message,
            'resource': resource,
            'severity': severity,
            'policy': self.policies.get('policies', {}).get(code, {})
        })
    
    def generate_report(self) -> Dict[str, Any]:
        """Generate security report"""
        severity_counts = {'critical': 0, 'high': 0, 'medium': 0, 'low': 0}
        
        for violation in self.violations:
            severity_counts[violation['severity']] += 1
        
        return {
            'summary': severity_counts,
            'total_violations': len(self.violations),
            'violations': self.violations,
            'passed': len(self.violations) == 0
        }

# Usage in CI/CD
if __name__ == '__main__':
    scanner = K8sSecurityScanner('security-policies.yaml')
    
    # Scan all manifests
    for manifest_file in Path('.').glob('**/*.yaml'):
        if 'test' not in str(manifest_file):  # Skip test files
            scanner.scan_manifest(str(manifest_file))
    
    # Generate report
    report = scanner.generate_report()
    
    # Output results
    print(json.dumps(report, indent=2))
    
    # Exit with error if critical violations
    if report['summary']['critical'] > 0:
        sys.exit(1)