Implementing Static Code Analysis
Implementing Static Code Analysis
Static code analysis tools examine source code to identify security vulnerabilities, code quality issues, and compliance violations. These tools use various techniques including pattern matching, data flow analysis, and abstract syntax tree (AST) analysis to understand code behavior without executing it.
# Python - Security Testing and Code Analysis
import ast
import os
import re
from typing import List, Dict, Set, Tuple, Any
import subprocess
import json
from pathlib import Path
import yaml
from dataclasses import dataclass
from enum import Enum
class VulnerabilityType(Enum):
SQL_INJECTION = "sql_injection"
XSS = "cross_site_scripting"
COMMAND_INJECTION = "command_injection"
PATH_TRAVERSAL = "path_traversal"
INSECURE_RANDOM = "insecure_random"
HARDCODED_SECRET = "hardcoded_secret"
WEAK_CRYPTO = "weak_cryptography"
XXE = "xml_external_entity"
INSECURE_DESERIALIZATION = "insecure_deserialization"
OPEN_REDIRECT = "open_redirect"
@dataclass
class SecurityFinding:
vulnerability_type: VulnerabilityType
severity: str
file_path: str
line_number: int
code_snippet: str
description: str
recommendation: str
cwe_id: str
class PythonSecurityAnalyzer:
def __init__(self, project_path: str):
self.project_path = Path(project_path)
self.findings: List[SecurityFinding] = []
self.dangerous_imports = self._load_dangerous_imports()
self.crypto_patterns = self._load_crypto_patterns()
def _load_dangerous_imports(self) -> Dict[str, Dict[str, Any]]:
"""Load patterns for dangerous imports and functions"""
return {
'subprocess': {
'functions': ['call', 'run', 'Popen', 'getoutput', 'getstatusoutput'],
'vulnerability': VulnerabilityType.COMMAND_INJECTION,
'cwe': 'CWE-78'
},
'os': {
'functions': ['system', 'popen', 'spawn*', 'exec*'],
'vulnerability': VulnerabilityType.COMMAND_INJECTION,
'cwe': 'CWE-78'
},
'pickle': {
'functions': ['load', 'loads'],
'vulnerability': VulnerabilityType.INSECURE_DESERIALIZATION,
'cwe': 'CWE-502'
},
'yaml': {
'functions': ['load'],
'vulnerability': VulnerabilityType.INSECURE_DESERIALIZATION,
'cwe': 'CWE-502'
},
'eval': {
'functions': ['eval'],
'vulnerability': VulnerabilityType.COMMAND_INJECTION,
'cwe': 'CWE-95'
},
'exec': {
'functions': ['exec'],
'vulnerability': VulnerabilityType.COMMAND_INJECTION,
'cwe': 'CWE-95'
}
}
def _load_crypto_patterns(self) -> Dict[str, Dict[str, Any]]:
"""Load patterns for weak cryptography"""
return {
'md5': {
'severity': 'high',
'description': 'MD5 is cryptographically broken',
'recommendation': 'Use SHA-256 or SHA-3 for hashing'
},
'sha1': {
'severity': 'high',
'description': 'SHA-1 is deprecated for cryptographic use',
'recommendation': 'Use SHA-256 or SHA-3'
},
'DES': {
'severity': 'critical',
'description': 'DES is obsolete and insecure',
'recommendation': 'Use AES-256-GCM for encryption'
},
'Random': {
'severity': 'high',
'description': 'Random module is not cryptographically secure',
'recommendation': 'Use secrets module for security-sensitive randomness'
}
}
def analyze_project(self) -> List[SecurityFinding]:
"""Analyze entire project for security issues"""
self.findings = []
# Find all Python files
python_files = list(self.project_path.rglob("*.py"))
for file_path in python_files:
if 'venv' in file_path.parts or '__pycache__' in file_path.parts:
continue
self.analyze_file(file_path)
# Run additional security tools
self._run_bandit()
self._run_safety_check()
self._run_semgrep()
return self.findings
def analyze_file(self, file_path: Path):
"""Analyze single Python file for security issues"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Parse AST
tree = ast.parse(content, filename=str(file_path))
# Run various analyzers
self._analyze_imports(tree, file_path, content)
self._analyze_sql_injection(tree, file_path, content)
self._analyze_hardcoded_secrets(content, file_path)
self._analyze_path_traversal(tree, file_path, content)
self._analyze_xss(tree, file_path, content)
self._analyze_crypto(tree, file_path, content)
except Exception as e:
print(f"Error analyzing {file_path}: {e}")
def _analyze_imports(self, tree: ast.AST, file_path: Path, content: str):
"""Analyze dangerous imports and function calls"""
class ImportVisitor(ast.NodeVisitor):
def __init__(self, analyzer):
self.analyzer = analyzer
self.imports = {}
def visit_Import(self, node):
for alias in node.names:
self.imports[alias.asname or alias.name] = alias.name
def visit_ImportFrom(self, node):
if node.module:
for alias in node.names:
full_name = f"{node.module}.{alias.name}"
self.imports[alias.asname or alias.name] = full_name
def visit_Call(self, node):
# Check for dangerous function calls
func_name = self._get_func_name(node.func)
# Check subprocess calls
if 'subprocess' in func_name or func_name in ['system', 'popen']:
# Check if input is user-controlled
if self._has_user_input(node):
self.analyzer.findings.append(SecurityFinding(
vulnerability_type=VulnerabilityType.COMMAND_INJECTION,
severity='critical',
file_path=str(file_path),
line_number=node.lineno,
code_snippet=ast.get_source_segment(content, node),
description='Potential command injection vulnerability',
recommendation='Use subprocess with shell=False and validate inputs',
cwe_id='CWE-78'
))
# Check eval/exec
if func_name in ['eval', 'exec'] and self._has_user_input(node):
self.analyzer.findings.append(SecurityFinding(
vulnerability_type=VulnerabilityType.COMMAND_INJECTION,
severity='critical',
file_path=str(file_path),
line_number=node.lineno,
code_snippet=ast.get_source_segment(content, node),
description='Code injection through eval/exec',
recommendation='Avoid eval/exec with user input',
cwe_id='CWE-95'
))
self.generic_visit(node)
def _get_func_name(self, node):
if isinstance(node, ast.Name):
return node.id
elif isinstance(node, ast.Attribute):
return f"{self._get_func_name(node.value)}.{node.attr}"
return ""
def _has_user_input(self, node):
# Simplified check - look for request, input, argv
source = ast.get_source_segment(content, node)
if source:
user_input_patterns = ['request.', 'input(', 'argv', 'environ']
return any(pattern in source for pattern in user_input_patterns)
return False
visitor = ImportVisitor(self)
visitor.visit(tree)
def _analyze_sql_injection(self, tree: ast.AST, file_path: Path, content: str):
"""Detect potential SQL injection vulnerabilities"""
class SQLVisitor(ast.NodeVisitor):
def __init__(self, analyzer):
self.analyzer = analyzer
def visit_Call(self, node):
# Look for execute() calls
if hasattr(node.func, 'attr') and node.func.attr in ['execute', 'executemany']:
if node.args:
# Check if using string formatting
first_arg = node.args[0]
if isinstance(first_arg, ast.BinOp) and isinstance(first_arg.op, ast.Mod):
self.analyzer.findings.append(SecurityFinding(
vulnerability_type=VulnerabilityType.SQL_INJECTION,
severity='critical',
file_path=str(file_path),
line_number=node.lineno,
code_snippet=ast.get_source_segment(content, node),
description='SQL query uses string formatting',
recommendation='Use parameterized queries',
cwe_id='CWE-89'
))
elif isinstance(first_arg, ast.JoinedStr):
# f-string in SQL
self.analyzer.findings.append(SecurityFinding(
vulnerability_type=VulnerabilityType.SQL_INJECTION,
severity='critical',
file_path=str(file_path),
line_number=node.lineno,
code_snippet=ast.get_source_segment(content, node),
description='SQL query uses f-string formatting',
recommendation='Use parameterized queries',
cwe_id='CWE-89'
))
self.generic_visit(node)
visitor = SQLVisitor(self)
visitor.visit(tree)
def _analyze_hardcoded_secrets(self, content: str, file_path: Path):
"""Detect hardcoded secrets and credentials"""
secret_patterns = [
(r'["\']?password["\']?\s*[:=]\s*["\'][^"\']+["\']', 'password'),
(r'["\']?api[_-]?key["\']?\s*[:=]\s*["\'][^"\']+["\']', 'API key'),
(r'["\']?secret[_-]?key["\']?\s*[:=]\s*["\'][^"\']+["\']', 'secret key'),
(r'["\']?aws[_-]?access[_-]?key["\']?\s*[:=]\s*["\'][A-Z0-9]{20}["\']', 'AWS access key'),
(r'["\']?aws[_-]?secret["\']?\s*[:=]\s*["\'][A-Za-z0-9/+=]{40}["\']', 'AWS secret'),
(r'["\']?token["\']?\s*[:=]\s*["\'][^"\']+["\']', 'token'),
(r'mongodb://[^:]+:[^@]+@', 'MongoDB connection string'),
(r'postgres://[^:]+:[^@]+@', 'PostgreSQL connection string'),
]
lines = content.split('\n')
for i, line in enumerate(lines):
for pattern, secret_type in secret_patterns:
if re.search(pattern, line, re.IGNORECASE):
# Check if it's not an example or environment variable
if not any(skip in line.lower() for skip in ['example', 'env.', 'os.environ', 'getenv']):
self.findings.append(SecurityFinding(
vulnerability_type=VulnerabilityType.HARDCODED_SECRET,
severity='high',
file_path=str(file_path),
line_number=i + 1,
code_snippet=line.strip(),
description=f'Hardcoded {secret_type} detected',
recommendation='Use environment variables or secret management service',
cwe_id='CWE-798'
))
def _run_bandit(self):
"""Run Bandit security linter"""
try:
result = subprocess.run(
['bandit', '-r', str(self.project_path), '-f', 'json'],
capture_output=True,
text=True
)
if result.returncode == 0:
bandit_results = json.loads(result.stdout)
for issue in bandit_results.get('results', []):
self.findings.append(SecurityFinding(
vulnerability_type=self._map_bandit_to_vuln_type(issue['test_id']),
severity=issue['issue_severity'].lower(),
file_path=issue['filename'],
line_number=issue['line_number'],
code_snippet=issue['code'],
description=issue['issue_text'],
recommendation=issue.get('issue_confidence', ''),
cwe_id=issue.get('cwe', 'CWE-unknown')
))
except Exception as e:
print(f"Bandit analysis failed: {e}")
def _map_bandit_to_vuln_type(self, test_id: str) -> VulnerabilityType:
"""Map Bandit test IDs to vulnerability types"""
mapping = {
'B201': VulnerabilityType.COMMAND_INJECTION, # flask debug
'B301': VulnerabilityType.INSECURE_DESERIALIZATION, # pickle
'B302': VulnerabilityType.INSECURE_DESERIALIZATION, # marshal
'B303': VulnerabilityType.WEAK_CRYPTO, # md5/sha1
'B304': VulnerabilityType.WEAK_CRYPTO, # des/rc4
'B305': VulnerabilityType.WEAK_CRYPTO, # cipher
'B306': VulnerabilityType.INSECURE_DESERIALIZATION, # mktemp
'B307': VulnerabilityType.COMMAND_INJECTION, # eval
'B308': VulnerabilityType.XXE, # mark_safe
'B309': VulnerabilityType.WEAK_CRYPTO, # httpsconnection
'B310': VulnerabilityType.OPEN_REDIRECT, # urllib
'B311': VulnerabilityType.INSECURE_RANDOM, # random
'B601': VulnerabilityType.COMMAND_INJECTION, # paramiko
'B602': VulnerabilityType.COMMAND_INJECTION, # subprocess
'B603': VulnerabilityType.COMMAND_INJECTION, # subprocess
'B604': VulnerabilityType.COMMAND_INJECTION, # shell
'B605': VulnerabilityType.COMMAND_INJECTION, # os
'B606': VulnerabilityType.COMMAND_INJECTION, # os
'B607': VulnerabilityType.COMMAND_INJECTION, # partial path
'B608': VulnerabilityType.SQL_INJECTION, # sql
'B609': VulnerabilityType.COMMAND_INJECTION, # wildcard
}
# Extract base test ID (e.g., B602 from B602)
base_id = test_id.split(':')[0] if ':' in test_id else test_id
return mapping.get(base_id, VulnerabilityType.COMMAND_INJECTION)
class SecurityTestFramework:
"""Framework for security-focused unit testing"""
def __init__(self):
self.test_results = []
def test_sql_injection_protection(self, db_function, test_inputs: List[str]):
"""Test function against SQL injection attempts"""
sql_injection_payloads = [
"' OR '1'='1",
"'; DROP TABLE users--",
"1' UNION SELECT null, username, password FROM users--",
"admin'--",
"' OR 1=1--",
"1' AND '1'='1",
"' UNION ALL SELECT NULL--",
"' AND SLEEP(5)--",
]
results = {
'function': db_function.__name__,
'vulnerable': False,
'payloads_tested': len(sql_injection_payloads),
'failed_payloads': []
}
for payload in sql_injection_payloads:
try:
# Test with payload
result = db_function(payload)
# Check if payload was executed (simplified check)
if isinstance(result, list) and len(result) > 0:
# Might be vulnerable if returns results with injection
results['vulnerable'] = True
results['failed_payloads'].append(payload)
except Exception as e:
# Good - function rejected the payload
pass
self.test_results.append(results)
return results
def test_xss_protection(self, render_function, test_inputs: List[str]):
"""Test function against XSS attempts"""
xss_payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror='alert(1)'>",
"<svg onload='alert(1)'>",
"javascript:alert('XSS')",
"<iframe src='javascript:alert(1)'>",
"<input onfocus='alert(1)' autofocus>",
"<select onfocus='alert(1)' autofocus>",
"<textarea onfocus='alert(1)' autofocus>",
"<keygen onfocus='alert(1)' autofocus>",
"<video><source onerror='alert(1)'>",
]
results = {
'function': render_function.__name__,
'vulnerable': False,
'payloads_tested': len(xss_payloads),
'unescaped_payloads': []
}
for payload in xss_payloads:
try:
# Test with payload
output = render_function(payload)
# Check if payload appears unescaped
if payload in output or 'alert(' in output:
results['vulnerable'] = True
results['unescaped_payloads'].append(payload)
except Exception:
# Function might reject the input
pass
self.test_results.append(results)
return results
def test_authentication_bypass(self, auth_function):
"""Test authentication function for bypass vulnerabilities"""
bypass_attempts = [
{'username': 'admin', 'password': "' OR '1'='1"},
{'username': "admin'--", 'password': 'anything'},
{'username': 'admin', 'password': None},
{'username': None, 'password': None},
{'username': '', 'password': ''},
{'username': 'admin\x00', 'password': 'wrong'},
{'username': 'admin', 'password': {'$ne': 'wrong'}}, # NoSQL
]
results = {
'function': auth_function.__name__,
'vulnerable': False,
'bypass_attempts': len(bypass_attempts),
'successful_bypasses': []
}
for attempt in bypass_attempts:
try:
# Test authentication
result = auth_function(
attempt.get('username'),
attempt.get('password')
)
# If returns True or user object, might be vulnerable
if result:
results['vulnerable'] = True
results['successful_bypasses'].append(attempt)
except Exception:
# Good - function rejected invalid input
pass
self.test_results.append(results)
return results
// JavaScript - Security Testing and Code Analysis
const fs = require('fs').promises;
const path = require('path');
const parser = require('@babel/parser');
const traverse = require('@babel/traverse').default;
const { execSync } = require('child_process');
class SecurityFinding {
constructor(type, severity, filePath, line, snippet, description, recommendation, cwe) {
this.vulnerabilityType = type;
this.severity = severity;
this.filePath = filePath;
this.line = line;
this.codeSnippet = snippet;
this.description = description;
this.recommendation = recommendation;
this.cweId = cwe;
}
}
class JavaScriptSecurityAnalyzer {
constructor(projectPath) {
this.projectPath = projectPath;
this.findings = [];
this.dangerousPatterns = this.loadDangerousPatterns();
this.domSinks = this.loadDOMSinks();
}
loadDangerousPatterns() {
return {
eval: {
pattern: /eval\s*\(/,
type: 'code_injection',
severity: 'critical',
description: 'Use of eval() can lead to code injection',
recommendation: 'Avoid eval() and use safer alternatives',
cwe: 'CWE-95'
},
innerHTML: {
pattern: /\.innerHTML\s*=/,
type: 'xss',
severity: 'high',
description: 'Direct innerHTML assignment can lead to XSS',
recommendation: 'Use textContent or sanitize HTML',
cwe: 'CWE-79'
},
documentWrite: {
pattern: /document\.write\s*\(/,
type: 'xss',
severity: 'high',
description: 'document.write can lead to XSS',
recommendation: 'Use DOM methods instead',
cwe: 'CWE-79'
},
setTimeout: {
pattern: /setTimeout\s*\(\s*["'].*["']/,
type: 'code_injection',
severity: 'medium',
description: 'String argument to setTimeout',
recommendation: 'Use function reference instead',
cwe: 'CWE-95'
},
SQLInjection: {
pattern: /query\s*\(\s*['"`].*\$\{.*\}.*['"`]/,
type: 'sql_injection',
severity: 'critical',
description: 'SQL query uses template literals',
recommendation: 'Use parameterized queries',
cwe: 'CWE-89'
}
};
}
loadDOMSinks() {
// Dangerous DOM properties and methods
return {
properties: [
'innerHTML', 'outerHTML', 'insertAdjacentHTML',
'document.write', 'document.writeln'
],
methods: [
'eval', 'setTimeout', 'setInterval', 'Function',
'execScript', 'setImmediate'
],
jQuery: [
'html', 'append', 'prepend', 'after', 'before',
'replaceWith', 'replaceAll'
]
};
}
async analyzeProject() {
this.findings = [];
// Get all JavaScript/TypeScript files
const files = await this.getAllJSFiles(this.projectPath);
for (const file of files) {
if (file.includes('node_modules') || file.includes('test')) {
continue;
}
await this.analyzeFile(file);
}
// Run additional security tools
await this.runESLintSecurity();
await this.runNpmAudit();
await this.runRetireJS();
return this.findings;
}
async analyzeFile(filePath) {
try {
const content = await fs.readFile(filePath, 'utf8');
// Parse AST
const ast = parser.parse(content, {
sourceType: 'module',
plugins: ['jsx', 'typescript']
});
// Run analyzers
this.analyzeCodeInjection(ast, filePath, content);
this.analyzeXSS(ast, filePath, content);
this.analyzeSQLInjection(ast, filePath, content);
this.analyzeInsecureRandomness(ast, filePath, content);
this.analyzeHardcodedSecrets(content, filePath);
} catch (error) {
console.error(`Error analyzing ${filePath}:`, error);
}
}
analyzeCodeInjection(ast, filePath, content) {
const self = this;
traverse(ast, {
CallExpression(path) {
const node = path.node;
// Check for eval()
if (node.callee.name === 'eval') {
self.findings.push(new SecurityFinding(
'code_injection',
'critical',
filePath,
node.loc.start.line,
self.getCodeSnippet(content, node.loc),
'Use of eval() can execute arbitrary code',
'Avoid eval() and use JSON.parse() or other safe alternatives',
'CWE-95'
));
}
// Check for Function constructor
if (node.callee.name === 'Function' ||
(node.callee.type === 'MemberExpression' &&
node.callee.object.name === 'window' &&
node.callee.property.name === 'Function')) {
self.findings.push(new SecurityFinding(
'code_injection',
'critical',
filePath,
node.loc.start.line,
self.getCodeSnippet(content, node.loc),
'Function constructor can execute arbitrary code',
'Avoid Function constructor',
'CWE-95'
));
}
// Check setTimeout/setInterval with strings
if (['setTimeout', 'setInterval'].includes(node.callee.name)) {
if (node.arguments[0] && node.arguments[0].type === 'StringLiteral') {
self.findings.push(new SecurityFinding(
'code_injection',
'high',
filePath,
node.loc.start.line,
self.getCodeSnippet(content, node.loc),
`String argument to ${node.callee.name} can execute code`,
'Use function reference instead of string',
'CWE-95'
));
}
}
}
});
}
analyzeXSS(ast, filePath, content) {
const self = this;
traverse(ast, {
AssignmentExpression(path) {
const node = path.node;
// Check for innerHTML assignment
if (node.left.type === 'MemberExpression' &&
node.left.property.name === 'innerHTML') {
// Check if right side contains user input
if (self.containsUserInput(node.right)) {
self.findings.push(new SecurityFinding(
'xss',
'critical',
filePath,
node.loc.start.line,
self.getCodeSnippet(content, node.loc),
'innerHTML assignment with user input can lead to XSS',
'Use textContent or sanitize HTML with DOMPurify',
'CWE-79'
));
}
}
},
CallExpression(path) {
const node = path.node;
// Check for document.write
if (node.callee.type === 'MemberExpression' &&
node.callee.object.name === 'document' &&
node.callee.property.name === 'write') {
self.findings.push(new SecurityFinding(
'xss',
'high',
filePath,
node.loc.start.line,
self.getCodeSnippet(content, node.loc),
'document.write can lead to XSS vulnerabilities',
'Use createElement and appendChild instead',
'CWE-79'
));
}
// Check jQuery methods
if (node.callee.type === 'MemberExpression' &&
self.domSinks.jQuery.includes(node.callee.property.name)) {
if (self.containsUserInput(node.arguments[0])) {
self.findings.push(new SecurityFinding(
'xss',
'high',
filePath,
node.loc.start.line,
self.getCodeSnippet(content, node.loc),
`jQuery ${node.callee.property.name}() with user input`,
'Sanitize HTML or use text() method',
'CWE-79'
));
}
}
}
});
}
analyzeSQLInjection(ast, filePath, content) {
const self = this;
traverse(ast, {
CallExpression(path) {
const node = path.node;
// Look for database query methods
const queryMethods = ['query', 'execute', 'exec', 'run'];
if (node.callee.type === 'MemberExpression' &&
queryMethods.includes(node.callee.property.name)) {
// Check first argument (query string)
const queryArg = node.arguments[0];
if (queryArg) {
// Check for template literals with expressions
if (queryArg.type === 'TemplateLiteral' &&
queryArg.expressions.length > 0) {
self.findings.push(new SecurityFinding(
'sql_injection',
'critical',
filePath,
node.loc.start.line,
self.getCodeSnippet(content, node.loc),
'SQL query uses template literal with expressions',
'Use parameterized queries or prepared statements',
'CWE-89'
));
}
// Check for string concatenation
if (queryArg.type === 'BinaryExpression' &&
queryArg.operator === '+') {
self.findings.push(new SecurityFinding(
'sql_injection',
'critical',
filePath,
node.loc.start.line,
self.getCodeSnippet(content, node.loc),
'SQL query uses string concatenation',
'Use parameterized queries or prepared statements',
'CWE-89'
));
}
}
}
}
});
}
analyzeInsecureRandomness(ast, filePath, content) {
const self = this;
traverse(ast, {
CallExpression(path) {
const node = path.node;
// Check for Math.random() in security contexts
if (node.callee.type === 'MemberExpression' &&
node.callee.object.name === 'Math' &&
node.callee.property.name === 'random') {
// Try to determine context
const parent = path.parent;
const context = self.getContext(path);
if (context.includes('token') ||
context.includes('password') ||
context.includes('key') ||
context.includes('secret')) {
self.findings.push(new SecurityFinding(
'insecure_random',
'high',
filePath,
node.loc.start.line,
self.getCodeSnippet(content, node.loc),
'Math.random() is not cryptographically secure',
'Use crypto.randomBytes() for security-sensitive randomness',
'CWE-338'
));
}
}
}
});
}
analyzeHardcodedSecrets(content, filePath) {
const secretPatterns = [
{
pattern: /["']?password["']?\s*[:=]\s*["'][^"']+["']/gi,
type: 'password'
},
{
pattern: /["']?api[_-]?key["']?\s*[:=]\s*["'][^"']+["']/gi,
type: 'API key'
},
{
pattern: /["']?secret["']?\s*[:=]\s*["'][^"']+["']/gi,
type: 'secret'
},
{
pattern: /["']?token["']?\s*[:=]\s*["'][^"']+["']/gi,
type: 'token'
},
{
pattern: /mongodb:\/\/[^:]+:[^@]+@/gi,
type: 'MongoDB connection string'
},
{
pattern: /postgres:\/\/[^:]+:[^@]+@/gi,
type: 'PostgreSQL connection string'
}
];
const lines = content.split('\n');
lines.forEach((line, index) => {
// Skip comments and obvious non-secrets
if (line.trim().startsWith('//') ||
line.includes('process.env') ||
line.includes('example') ||
line.includes('placeholder')) {
return;
}
secretPatterns.forEach(({ pattern, type }) => {
if (pattern.test(line)) {
this.findings.push(new SecurityFinding(
'hardcoded_secret',
'high',
filePath,
index + 1,
line.trim(),
`Hardcoded ${type} detected`,
'Use environment variables or secure key management',
'CWE-798'
));
}
});
});
}
containsUserInput(node) {
if (!node) return false;
// Check for common user input sources
const userInputSources = [
'req.body', 'req.query', 'req.params',
'request.body', 'request.query', 'request.params',
'location.search', 'location.hash',
'window.location', 'document.location',
'localStorage', 'sessionStorage',
'document.cookie'
];
// Convert node to string for simple check
const nodeStr = this.nodeToString(node);
return userInputSources.some(source => nodeStr.includes(source));
}
nodeToString(node) {
// Simple node to string conversion
if (node.type === 'Identifier') {
return node.name;
} else if (node.type === 'MemberExpression') {
return `${this.nodeToString(node.object)}.${node.property.name}`;
} else if (node.type === 'StringLiteral') {
return node.value;
}
return '';
}
getContext(path) {
// Get surrounding context to understand usage
let context = '';
let current = path;
for (let i = 0; i < 3 && current.parent; i++) {
if (current.parent.type === 'VariableDeclarator') {
context += current.parent.id.name + ' ';
} else if (current.parent.type === 'AssignmentExpression') {
context += this.nodeToString(current.parent.left) + ' ';
}
current = current.parentPath;
}
return context.toLowerCase();
}
getCodeSnippet(content, loc) {
const lines = content.split('\n');
const startLine = Math.max(0, loc.start.line - 2);
const endLine = Math.min(lines.length, loc.end.line + 1);
return lines.slice(startLine, endLine).join('\n');
}
async getAllJSFiles(dir, files = []) {
const entries = await fs.readdir(dir, { withFileTypes: true });
for (const entry of entries) {
const fullPath = path.join(dir, entry.name);
if (entry.isDirectory() &&
!entry.name.startsWith('.') &&
entry.name !== 'node_modules') {
await this.getAllJSFiles(fullPath, files);
} else if (entry.isFile() &&
(entry.name.endsWith('.js') ||
entry.name.endsWith('.jsx') ||
entry.name.endsWith('.ts') ||
entry.name.endsWith('.tsx'))) {
files.push(fullPath);
}
}
return files;
}
async runESLintSecurity() {
try {
// Run ESLint with security plugin
const result = execSync(
`npx eslint ${this.projectPath} --plugin security --format json`,
{ encoding: 'utf8', stdio: 'pipe' }
);
const eslintResults = JSON.parse(result);
// Process ESLint findings
for (const file of eslintResults) {
for (const message of file.messages) {
if (message.ruleId && message.ruleId.startsWith('security/')) {
this.findings.push(new SecurityFinding(
this.mapESLintRule(message.ruleId),
message.severity === 2 ? 'high' : 'medium',
file.filePath,
message.line,
message.source,
message.message,
'Follow ESLint security recommendations',
'CWE-' + (message.ruleId.split('/')[1] || 'unknown')
));
}
}
}
} catch (error) {
console.error('ESLint security check failed:', error.message);
}
}
mapESLintRule(ruleId) {
const mapping = {
'security/detect-eval-with-expression': 'code_injection',
'security/detect-non-literal-regexp': 'regex_injection',
'security/detect-non-literal-require': 'code_injection',
'security/detect-object-injection': 'injection',
'security/detect-possible-timing-attack': 'timing_attack',
'security/detect-pseudoRandomBytes': 'insecure_random',
'security/detect-unsafe-regex': 'redos',
'security/detect-child-process': 'command_injection'
};
return mapping[ruleId] || 'security_issue';
}
}
// Security-focused testing framework
class SecurityTestRunner {
constructor() {
this.results = [];
}
async testXSSProtection(renderFunction) {
const xssPayloads = [
'<script>alert("XSS")</script>',
'<img src=x onerror=alert(1)>',
'<svg onload=alert(1)>',
'javascript:alert(1)',
'<iframe src=javascript:alert(1)>',
'<body onload=alert(1)>',
'<input autofocus onfocus=alert(1)>',
'<select autofocus onfocus=alert(1)>',
'<textarea autofocus onfocus=alert(1)>',
'<keygen autofocus onfocus=alert(1)>',
'<video><source onerror=alert(1)>',
'<audio src=x onerror=alert(1)>',
'<details open ontoggle=alert(1)>',
'<marquee onstart=alert(1)>'
];
const results = {
function: renderFunction.name,
vulnerable: false,
testedPayloads: xssPayloads.length,
failedPayloads: []
};
for (const payload of xssPayloads) {
try {
const output = await renderFunction(payload);
// Check if payload is present unescaped
if (output.includes(payload) ||
output.includes('alert(') ||
output.includes('onerror=')) {
results.vulnerable = true;
results.failedPayloads.push({
payload,
output: output.substring(0, 100)
});
}
} catch (error) {
// Error handling
}
}
this.results.push(results);
return results;
}
async testSQLInjection(queryFunction) {
const sqlPayloads = [
"' OR '1'='1",
"'; DROP TABLE users--",
"' UNION SELECT * FROM users--",
"admin'--",
"' OR 1=1--",
"1'; WAITFOR DELAY '00:00:05'--",
"' AND (SELECT * FROM users)--",
"' OR EXISTS(SELECT * FROM users WHERE username='admin')--"
];
const results = {
function: queryFunction.name,
vulnerable: false,
testedPayloads: sqlPayloads.length,
failedPayloads: []
};
for (const payload of sqlPayloads) {
try {
const result = await queryFunction(payload);
// Check for signs of injection
if (Array.isArray(result) && result.length > 0) {
// Might be vulnerable if returns unexpected results
results.vulnerable = true;
results.failedPayloads.push(payload);
}
} catch (error) {
// Good - query failed
if (!error.message.includes('syntax')) {
// But if not syntax error, might still be vulnerable
results.errors = results.errors || [];
results.errors.push({
payload,
error: error.message
});
}
}
}
this.results.push(results);
return results;
}
async testAuthenticationBypass(authFunction) {
const bypassPayloads = [
{ username: 'admin', password: "' OR '1'='1" },
{ username: "admin'--", password: 'anything' },
{ username: 'admin', password: null },
{ username: null, password: null },
{ username: '', password: '' },
{ username: 'admin\x00', password: 'wrong' },
{ username: 'admin', password: { $ne: null } }, // NoSQL
{ username: { $gt: '' }, password: { $gt: '' } }, // NoSQL
{ username: 'admin', password: undefined },
{ username: ['admin'], password: 'wrong' } // Type confusion
];
const results = {
function: authFunction.name,
vulnerable: false,
testedPayloads: bypassPayloads.length,
successfulBypasses: []
};
for (const payload of bypassPayloads) {
try {
const result = await authFunction(
payload.username,
payload.password
);
// If returns truthy value, authentication was bypassed
if (result) {
results.vulnerable = true;
results.successfulBypasses.push(payload);
}
} catch (error) {
// Good - authentication failed
}
}
this.results.push(results);
return results;
}
}
Implementing comprehensive security testing and code analysis is essential for identifying vulnerabilities before they reach production. By combining static analysis, dynamic testing, and security-focused unit tests, development teams can build more secure applications while maintaining development velocity. Regular security testing should be integrated into CI/CD pipelines to ensure ongoing protection.## Security in Development Workflow
Integrating security into the development workflow transforms security from a gate at the end of development to an integral part of the entire software development lifecycle. This DevSecOps approach ensures that security considerations are addressed early and continuously, reducing both the cost and impact of vulnerabilities. This chapter explores comprehensive strategies for embedding security practices into every stage of development, from initial design through deployment and maintenance.