Static Security Analysis for IaC
Static Security Analysis for IaC
Static analysis forms the foundation of IaC security testing by examining code without execution. Modern static analysis tools understand IaC languages deeply, detecting both syntactic issues and semantic vulnerabilities. These tools identify misconfigurations by analyzing resource definitions, parameter values, and relationships between resources.
Multi-tool approaches provide better coverage than relying on single scanners. Different tools excel at detecting different vulnerability types – some focus on cloud-specific misconfigurations while others specialize in compliance validation. Running multiple tools in parallel and aggregating results provides comprehensive security coverage.
#!/usr/bin/env python3
# iac_security_pipeline.py - Orchestrate multiple security tools
import subprocess
import json
import yaml
from pathlib import Path
from typing import Dict, List, Any
import concurrent.futures
from dataclasses import dataclass
@dataclass
class SecurityFinding:
tool: str
severity: str
file: str
line: int
rule: str
message: str
remediation: str
class IaCSecurityPipeline:
def __init__(self, project_dir: str):
self.project_dir = Path(project_dir)
self.tools = {
'tfsec': TfsecScanner(),
'checkov': CheckovScanner(),
'terrascan': TerrascanScanner(),
'kics': KicsScanner(),
'trivy': TrivyScanner()
}
self.findings = []
def run_security_scan(self) -> Dict[str, Any]:
"""Execute all security scanners in parallel."""
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_tool = {
executor.submit(
tool.scan,
self.project_dir
): name
for name, tool in self.tools.items()
}
for future in concurrent.futures.as_completed(future_to_tool):
tool_name = future_to_tool[future]
try:
tool_findings = future.result()
self.findings.extend(tool_findings)
print(f"✓ {tool_name} completed: {len(tool_findings)} findings")
except Exception as e:
print(f"✗ {tool_name} failed: {str(e)}")
return self._aggregate_results()
def _aggregate_results(self) -> Dict[str, Any]:
"""Aggregate and deduplicate findings from all tools."""
# Deduplicate findings
unique_findings = self._deduplicate_findings(self.findings)
# Calculate statistics
stats = {
'total_findings': len(unique_findings),
'by_severity': {},
'by_tool': {},
'by_file': {}
}
for finding in unique_findings:
# By severity
severity = finding.severity.lower()
stats['by_severity'][severity] = stats['by_severity'].get(severity, 0) + 1
# By tool
stats['by_tool'][finding.tool] = stats['by_tool'].get(finding.tool, 0) + 1
# By file
stats['by_file'][finding.file] = stats['by_file'].get(finding.file, 0) + 1
return {
'findings': unique_findings,
'statistics': stats,
'passed': len([f for f in unique_findings if f.severity.lower() in ['critical', 'high']]) == 0
}
def _deduplicate_findings(self, findings: List[SecurityFinding]) -> List[SecurityFinding]:
"""Remove duplicate findings based on file, line, and rule."""
seen = set()
unique = []
for finding in findings:
key = (finding.file, finding.line, finding.rule)
if key not in seen:
seen.add(key)
unique.append(finding)
return unique
def generate_report(self, results: Dict[str, Any], format: str = 'html') -> str:
"""Generate security scan report in requested format."""
if format == 'html':
return self._generate_html_report(results)
elif format == 'json':
return json.dumps(results, default=str, indent=2)
elif format == 'junit':
return self._generate_junit_report(results)
elif format == 'sarif':
return self._generate_sarif_report(results)
def _generate_html_report(self, results: Dict[str, Any]) -> str:
"""Generate HTML report with findings."""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>IaC Security Scan Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.summary { background: #f0f0f0; padding: 15px; border-radius: 5px; }
.critical { color: #d32f2f; }
.high { color: #f57c00; }
.medium { color: #fbc02d; }
.low { color: #388e3c; }
table { border-collapse: collapse; width: 100%; margin-top: 20px; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #4CAF50; color: white; }
tr:nth-child(even) { background-color: #f2f2f2; }
</style>
</head>
<body>
<h1>Infrastructure as Code Security Scan Report</h1>
<div class="summary">
<h2>Summary</h2>
<p>Total Findings: {total}</p>
<p>Critical: <span class="critical">{critical}</span></p>
<p>High: <span class="high">{high}</span></p>
<p>Medium: <span class="medium">{medium}</span></p>
<p>Low: <span class="low">{low}</span></p>
</div>
<h2>Detailed Findings</h2>
<table>
<tr>
<th>Severity</th>
<th>File</th>
<th>Line</th>
<th>Rule</th>
<th>Message</th>
<th>Tool</th>
</tr>
{findings_rows}
</table>
</body>
</html>
"""
findings_rows = ""
for finding in results['findings']:
findings_rows += f"""
<tr>
<td class="{finding.severity.lower()}">{finding.severity}</td>
<td>{finding.file}</td>
<td>{finding.line}</td>
<td>{finding.rule}</td>
<td>{finding.message}</td>
<td>{finding.tool}</td>
</tr>
"""
stats = results['statistics']['by_severity']
return html_template.format(
total=results['statistics']['total_findings'],
critical=stats.get('critical', 0),
high=stats.get('high', 0),
medium=stats.get('medium', 0),
low=stats.get('low', 0),
findings_rows=findings_rows
)
class TfsecScanner:
def scan(self, directory: Path) -> List[SecurityFinding]:
"""Run tfsec scanner."""
cmd = ['tfsec', str(directory), '--format', 'json', '--no-color']
result = subprocess.run(cmd, capture_output=True, text=True)
findings = []
if result.stdout:
data = json.loads(result.stdout)
for issue in data.get('results', []):
finding = SecurityFinding(
tool='tfsec',
severity=issue['severity'],
file=issue['location']['filename'],
line=issue['location']['start_line'],
rule=issue['rule_id'],
message=issue['description'],
remediation=issue.get('resolution', '')
)
findings.append(finding)
return findings
Policy as Code validation ensures IaC complies with organizational standards beyond generic security rules. Custom policies encode specific requirements like approved instance types, required tags, or network architectures. Policy engines evaluate IaC against these rules, providing consistent enforcement across all infrastructure deployments.