Infrastructure as Code Security
Infrastructure as Code Security
Infrastructure as Code (IaC) enables version-controlled, repeatable infrastructure deployment. For containers, this includes Kubernetes manifests, Helm charts, and Docker Compose files. IaC security scanning prevents misconfiguration vulnerabilities before deployment. Security policies can be enforced consistently across all environments through IaC scanning.
IaC security tools examine configurations for security best practices and compliance requirements. They can detect overly permissive security policies, missing encryption settings, and exposed secrets. Integration with version control systems enables security scanning during code review. This early detection prevents security misconfigurations from reaching production.
# Example: IaC security scanner for Kubernetes manifests
import yaml
import json
import sys
from pathlib import Path
from typing import List, Dict, Any
class K8sSecurityScanner:
def __init__(self, policy_file: str):
with open(policy_file, 'r') as f:
self.policies = yaml.safe_load(f)
self.violations = []
def scan_manifest(self, manifest_path: str) -> List[Dict[str, Any]]:
"""Scan Kubernetes manifest for security issues"""
with open(manifest_path, 'r') as f:
# Handle multi-document YAML
documents = yaml.safe_load_all(f)
for doc in documents:
if not doc:
continue
kind = doc.get('kind', '')
# Route to appropriate scanner
if kind == 'Pod':
self._scan_pod(doc)
elif kind == 'Deployment':
self._scan_deployment(doc)
elif kind == 'Service':
self._scan_service(doc)
elif kind == 'NetworkPolicy':
self._scan_network_policy(doc)
elif kind in ['Role', 'ClusterRole']:
self._scan_rbac(doc)
return self.violations
def _scan_pod(self, pod: Dict[str, Any]):
"""Security scanning for Pod specifications"""
metadata = pod.get('metadata', {})
spec = pod.get('spec', {})
# Check for security context
if 'securityContext' not in spec:
self._add_violation(
'POD001',
'Pod missing securityContext',
metadata.get('name', 'unknown'),
'high'
)
# Scan containers
for container in spec.get('containers', []):
self._scan_container(container, metadata.get('name'))
def _scan_container(self, container: Dict[str, Any], parent_name: str):
"""Security scanning for container specifications"""
name = container.get('name', 'unknown')
# Check image source
image = container.get('image', '')
if ':latest' in image or ':' not in image:
self._add_violation(
'IMG001',
'Container using latest or untagged image',
f"{parent_name}/{name}",
'medium'
)
# Check for registry whitelist
if not any(registry in image for registry in self.policies['allowed_registries']):
self._add_violation(
'IMG002',
'Container using non-whitelisted registry',
f"{parent_name}/{name}",
'high'
)
# Security context checks
security_context = container.get('securityContext', {})
if not security_context.get('runAsNonRoot', False):
self._add_violation(
'SEC001',
'Container not configured to run as non-root',
f"{parent_name}/{name}",
'high'
)
if security_context.get('privileged', False):
self._add_violation(
'SEC002',
'Container running in privileged mode',
f"{parent_name}/{name}",
'critical'
)
if not security_context.get('readOnlyRootFilesystem', False):
self._add_violation(
'SEC003',
'Container filesystem not read-only',
f"{parent_name}/{name}",
'medium'
)
# Capabilities check
capabilities = security_context.get('capabilities', {})
if 'add' in capabilities:
for cap in capabilities['add']:
if cap in self.policies['dangerous_capabilities']:
self._add_violation(
'CAP001',
f'Container adding dangerous capability: {cap}',
f"{parent_name}/{name}",
'high'
)
# Resource limits
if 'resources' not in container or 'limits' not in container['resources']:
self._add_violation(
'RES001',
'Container missing resource limits',
f"{parent_name}/{name}",
'medium'
)
# Environment variables check for secrets
for env in container.get('env', []):
if any(secret_pattern in env.get('name', '').lower()
for secret_pattern in ['password', 'key', 'token', 'secret']):
if 'value' in env: # Direct value instead of secretRef
self._add_violation(
'SEC004',
'Potential secret in environment variable',
f"{parent_name}/{name}/{env['name']}",
'critical'
)
def _scan_network_policy(self, policy: Dict[str, Any]):
"""Check for network segmentation"""
spec = policy.get('spec', {})
# Check if policy is too permissive
if not spec.get('ingress') and not spec.get('egress'):
self._add_violation(
'NET001',
'NetworkPolicy with no rules (allows all traffic)',
policy.get('metadata', {}).get('name', 'unknown'),
'high'
)
def _scan_rbac(self, role: Dict[str, Any]):
"""Check RBAC configurations"""
rules = role.get('rules', [])
for rule in rules:
# Check for wildcard permissions
if '*' in rule.get('verbs', []):
self._add_violation(
'RBAC001',
'RBAC rule with wildcard verbs',
role.get('metadata', {}).get('name', 'unknown'),
'high'
)
if '*' in rule.get('resources', []):
self._add_violation(
'RBAC002',
'RBAC rule with wildcard resources',
role.get('metadata', {}).get('name', 'unknown'),
'high'
)
def _add_violation(self, code: str, message: str, resource: str, severity: str):
"""Add security violation"""
self.violations.append({
'code': code,
'message': message,
'resource': resource,
'severity': severity,
'policy': self.policies.get('policies', {}).get(code, {})
})
def generate_report(self) -> Dict[str, Any]:
"""Generate security report"""
severity_counts = {'critical': 0, 'high': 0, 'medium': 0, 'low': 0}
for violation in self.violations:
severity_counts[violation['severity']] += 1
return {
'summary': severity_counts,
'total_violations': len(self.violations),
'violations': self.violations,
'passed': len(self.violations) == 0
}
# Usage in CI/CD
if __name__ == '__main__':
scanner = K8sSecurityScanner('security-policies.yaml')
# Scan all manifests
for manifest_file in Path('.').glob('**/*.yaml'):
if 'test' not in str(manifest_file): # Skip test files
scanner.scan_manifest(str(manifest_file))
# Generate report
report = scanner.generate_report()
# Output results
print(json.dumps(report, indent=2))
# Exit with error if critical violations
if report['summary']['critical'] > 0:
sys.exit(1)