Implementing Static Code Analysis

Implementing Static Code Analysis

Static code analysis tools examine source code to identify security vulnerabilities, code quality issues, and compliance violations. These tools use various techniques including pattern matching, data flow analysis, and abstract syntax tree (AST) analysis to understand code behavior without executing it.

# Python - Security Testing and Code Analysis
import ast
import os
import re
from typing import List, Dict, Set, Tuple, Any
import subprocess
import json
from pathlib import Path
import yaml
from dataclasses import dataclass
from enum import Enum

class VulnerabilityType(Enum):
    SQL_INJECTION = "sql_injection"
    XSS = "cross_site_scripting"
    COMMAND_INJECTION = "command_injection"
    PATH_TRAVERSAL = "path_traversal"
    INSECURE_RANDOM = "insecure_random"
    HARDCODED_SECRET = "hardcoded_secret"
    WEAK_CRYPTO = "weak_cryptography"
    XXE = "xml_external_entity"
    INSECURE_DESERIALIZATION = "insecure_deserialization"
    OPEN_REDIRECT = "open_redirect"

@dataclass
class SecurityFinding:
    vulnerability_type: VulnerabilityType
    severity: str
    file_path: str
    line_number: int
    code_snippet: str
    description: str
    recommendation: str
    cwe_id: str

class PythonSecurityAnalyzer:
    def __init__(self, project_path: str):
        self.project_path = Path(project_path)
        self.findings: List[SecurityFinding] = []
        self.dangerous_imports = self._load_dangerous_imports()
        self.crypto_patterns = self._load_crypto_patterns()
        
    def _load_dangerous_imports(self) -> Dict[str, Dict[str, Any]]:
        """Load patterns for dangerous imports and functions"""
        return {
            'subprocess': {
                'functions': ['call', 'run', 'Popen', 'getoutput', 'getstatusoutput'],
                'vulnerability': VulnerabilityType.COMMAND_INJECTION,
                'cwe': 'CWE-78'
            },
            'os': {
                'functions': ['system', 'popen', 'spawn*', 'exec*'],
                'vulnerability': VulnerabilityType.COMMAND_INJECTION,
                'cwe': 'CWE-78'
            },
            'pickle': {
                'functions': ['load', 'loads'],
                'vulnerability': VulnerabilityType.INSECURE_DESERIALIZATION,
                'cwe': 'CWE-502'
            },
            'yaml': {
                'functions': ['load'],
                'vulnerability': VulnerabilityType.INSECURE_DESERIALIZATION,
                'cwe': 'CWE-502'
            },
            'eval': {
                'functions': ['eval'],
                'vulnerability': VulnerabilityType.COMMAND_INJECTION,
                'cwe': 'CWE-95'
            },
            'exec': {
                'functions': ['exec'],
                'vulnerability': VulnerabilityType.COMMAND_INJECTION,
                'cwe': 'CWE-95'
            }
        }
    
    def _load_crypto_patterns(self) -> Dict[str, Dict[str, Any]]:
        """Load patterns for weak cryptography"""
        return {
            'md5': {
                'severity': 'high',
                'description': 'MD5 is cryptographically broken',
                'recommendation': 'Use SHA-256 or SHA-3 for hashing'
            },
            'sha1': {
                'severity': 'high',
                'description': 'SHA-1 is deprecated for cryptographic use',
                'recommendation': 'Use SHA-256 or SHA-3'
            },
            'DES': {
                'severity': 'critical',
                'description': 'DES is obsolete and insecure',
                'recommendation': 'Use AES-256-GCM for encryption'
            },
            'Random': {
                'severity': 'high',
                'description': 'Random module is not cryptographically secure',
                'recommendation': 'Use secrets module for security-sensitive randomness'
            }
        }
    
    def analyze_project(self) -> List[SecurityFinding]:
        """Analyze entire project for security issues"""
        self.findings = []
        
        # Find all Python files
        python_files = list(self.project_path.rglob("*.py"))
        
        for file_path in python_files:
            if 'venv' in file_path.parts or '__pycache__' in file_path.parts:
                continue
                
            self.analyze_file(file_path)
        
        # Run additional security tools
        self._run_bandit()
        self._run_safety_check()
        self._run_semgrep()
        
        return self.findings
    
    def analyze_file(self, file_path: Path):
        """Analyze single Python file for security issues"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                
            # Parse AST
            tree = ast.parse(content, filename=str(file_path))
            
            # Run various analyzers
            self._analyze_imports(tree, file_path, content)
            self._analyze_sql_injection(tree, file_path, content)
            self._analyze_hardcoded_secrets(content, file_path)
            self._analyze_path_traversal(tree, file_path, content)
            self._analyze_xss(tree, file_path, content)
            self._analyze_crypto(tree, file_path, content)
            
        except Exception as e:
            print(f"Error analyzing {file_path}: {e}")
    
    def _analyze_imports(self, tree: ast.AST, file_path: Path, content: str):
        """Analyze dangerous imports and function calls"""
        class ImportVisitor(ast.NodeVisitor):
            def __init__(self, analyzer):
                self.analyzer = analyzer
                self.imports = {}
                
            def visit_Import(self, node):
                for alias in node.names:
                    self.imports[alias.asname or alias.name] = alias.name
                    
            def visit_ImportFrom(self, node):
                if node.module:
                    for alias in node.names:
                        full_name = f"{node.module}.{alias.name}"
                        self.imports[alias.asname or alias.name] = full_name
                        
            def visit_Call(self, node):
                # Check for dangerous function calls
                func_name = self._get_func_name(node.func)
                
                # Check subprocess calls
                if 'subprocess' in func_name or func_name in ['system', 'popen']:
                    # Check if input is user-controlled
                    if self._has_user_input(node):
                        self.analyzer.findings.append(SecurityFinding(
                            vulnerability_type=VulnerabilityType.COMMAND_INJECTION,
                            severity='critical',
                            file_path=str(file_path),
                            line_number=node.lineno,
                            code_snippet=ast.get_source_segment(content, node),
                            description='Potential command injection vulnerability',
                            recommendation='Use subprocess with shell=False and validate inputs',
                            cwe_id='CWE-78'
                        ))
                
                # Check eval/exec
                if func_name in ['eval', 'exec'] and self._has_user_input(node):
                    self.analyzer.findings.append(SecurityFinding(
                        vulnerability_type=VulnerabilityType.COMMAND_INJECTION,
                        severity='critical',
                        file_path=str(file_path),
                        line_number=node.lineno,
                        code_snippet=ast.get_source_segment(content, node),
                        description='Code injection through eval/exec',
                        recommendation='Avoid eval/exec with user input',
                        cwe_id='CWE-95'
                    ))
                
                self.generic_visit(node)
                
            def _get_func_name(self, node):
                if isinstance(node, ast.Name):
                    return node.id
                elif isinstance(node, ast.Attribute):
                    return f"{self._get_func_name(node.value)}.{node.attr}"
                return ""
                
            def _has_user_input(self, node):
                # Simplified check - look for request, input, argv
                source = ast.get_source_segment(content, node)
                if source:
                    user_input_patterns = ['request.', 'input(', 'argv', 'environ']
                    return any(pattern in source for pattern in user_input_patterns)
                return False
        
        visitor = ImportVisitor(self)
        visitor.visit(tree)
    
    def _analyze_sql_injection(self, tree: ast.AST, file_path: Path, content: str):
        """Detect potential SQL injection vulnerabilities"""
        class SQLVisitor(ast.NodeVisitor):
            def __init__(self, analyzer):
                self.analyzer = analyzer
                
            def visit_Call(self, node):
                # Look for execute() calls
                if hasattr(node.func, 'attr') and node.func.attr in ['execute', 'executemany']:
                    if node.args:
                        # Check if using string formatting
                        first_arg = node.args[0]
                        if isinstance(first_arg, ast.BinOp) and isinstance(first_arg.op, ast.Mod):
                            self.analyzer.findings.append(SecurityFinding(
                                vulnerability_type=VulnerabilityType.SQL_INJECTION,
                                severity='critical',
                                file_path=str(file_path),
                                line_number=node.lineno,
                                code_snippet=ast.get_source_segment(content, node),
                                description='SQL query uses string formatting',
                                recommendation='Use parameterized queries',
                                cwe_id='CWE-89'
                            ))
                        elif isinstance(first_arg, ast.JoinedStr):
                            # f-string in SQL
                            self.analyzer.findings.append(SecurityFinding(
                                vulnerability_type=VulnerabilityType.SQL_INJECTION,
                                severity='critical',
                                file_path=str(file_path),
                                line_number=node.lineno,
                                code_snippet=ast.get_source_segment(content, node),
                                description='SQL query uses f-string formatting',
                                recommendation='Use parameterized queries',
                                cwe_id='CWE-89'
                            ))
                
                self.generic_visit(node)
        
        visitor = SQLVisitor(self)
        visitor.visit(tree)
    
    def _analyze_hardcoded_secrets(self, content: str, file_path: Path):
        """Detect hardcoded secrets and credentials"""
        secret_patterns = [
            (r'["\']?password["\']?\s*[:=]\s*["\'][^"\']+["\']', 'password'),
            (r'["\']?api[_-]?key["\']?\s*[:=]\s*["\'][^"\']+["\']', 'API key'),
            (r'["\']?secret[_-]?key["\']?\s*[:=]\s*["\'][^"\']+["\']', 'secret key'),
            (r'["\']?aws[_-]?access[_-]?key["\']?\s*[:=]\s*["\'][A-Z0-9]{20}["\']', 'AWS access key'),
            (r'["\']?aws[_-]?secret["\']?\s*[:=]\s*["\'][A-Za-z0-9/+=]{40}["\']', 'AWS secret'),
            (r'["\']?token["\']?\s*[:=]\s*["\'][^"\']+["\']', 'token'),
            (r'mongodb://[^:]+:[^@]+@', 'MongoDB connection string'),
            (r'postgres://[^:]+:[^@]+@', 'PostgreSQL connection string'),
        ]
        
        lines = content.split('\n')
        for i, line in enumerate(lines):
            for pattern, secret_type in secret_patterns:
                if re.search(pattern, line, re.IGNORECASE):
                    # Check if it's not an example or environment variable
                    if not any(skip in line.lower() for skip in ['example', 'env.', 'os.environ', 'getenv']):
                        self.findings.append(SecurityFinding(
                            vulnerability_type=VulnerabilityType.HARDCODED_SECRET,
                            severity='high',
                            file_path=str(file_path),
                            line_number=i + 1,
                            code_snippet=line.strip(),
                            description=f'Hardcoded {secret_type} detected',
                            recommendation='Use environment variables or secret management service',
                            cwe_id='CWE-798'
                        ))
    
    def _run_bandit(self):
        """Run Bandit security linter"""
        try:
            result = subprocess.run(
                ['bandit', '-r', str(self.project_path), '-f', 'json'],
                capture_output=True,
                text=True
            )
            
            if result.returncode == 0:
                bandit_results = json.loads(result.stdout)
                
                for issue in bandit_results.get('results', []):
                    self.findings.append(SecurityFinding(
                        vulnerability_type=self._map_bandit_to_vuln_type(issue['test_id']),
                        severity=issue['issue_severity'].lower(),
                        file_path=issue['filename'],
                        line_number=issue['line_number'],
                        code_snippet=issue['code'],
                        description=issue['issue_text'],
                        recommendation=issue.get('issue_confidence', ''),
                        cwe_id=issue.get('cwe', 'CWE-unknown')
                    ))
        except Exception as e:
            print(f"Bandit analysis failed: {e}")
    
    def _map_bandit_to_vuln_type(self, test_id: str) -> VulnerabilityType:
        """Map Bandit test IDs to vulnerability types"""
        mapping = {
            'B201': VulnerabilityType.COMMAND_INJECTION,  # flask debug
            'B301': VulnerabilityType.INSECURE_DESERIALIZATION,  # pickle
            'B302': VulnerabilityType.INSECURE_DESERIALIZATION,  # marshal
            'B303': VulnerabilityType.WEAK_CRYPTO,  # md5/sha1
            'B304': VulnerabilityType.WEAK_CRYPTO,  # des/rc4
            'B305': VulnerabilityType.WEAK_CRYPTO,  # cipher
            'B306': VulnerabilityType.INSECURE_DESERIALIZATION,  # mktemp
            'B307': VulnerabilityType.COMMAND_INJECTION,  # eval
            'B308': VulnerabilityType.XXE,  # mark_safe
            'B309': VulnerabilityType.WEAK_CRYPTO,  # httpsconnection
            'B310': VulnerabilityType.OPEN_REDIRECT,  # urllib
            'B311': VulnerabilityType.INSECURE_RANDOM,  # random
            'B601': VulnerabilityType.COMMAND_INJECTION,  # paramiko
            'B602': VulnerabilityType.COMMAND_INJECTION,  # subprocess
            'B603': VulnerabilityType.COMMAND_INJECTION,  # subprocess
            'B604': VulnerabilityType.COMMAND_INJECTION,  # shell
            'B605': VulnerabilityType.COMMAND_INJECTION,  # os
            'B606': VulnerabilityType.COMMAND_INJECTION,  # os
            'B607': VulnerabilityType.COMMAND_INJECTION,  # partial path
            'B608': VulnerabilityType.SQL_INJECTION,  # sql
            'B609': VulnerabilityType.COMMAND_INJECTION,  # wildcard
        }
        
        # Extract base test ID (e.g., B602 from B602)
        base_id = test_id.split(':')[0] if ':' in test_id else test_id
        
        return mapping.get(base_id, VulnerabilityType.COMMAND_INJECTION)

class SecurityTestFramework:
    """Framework for security-focused unit testing"""
    
    def __init__(self):
        self.test_results = []
        
    def test_sql_injection_protection(self, db_function, test_inputs: List[str]):
        """Test function against SQL injection attempts"""
        sql_injection_payloads = [
            "' OR '1'='1",
            "'; DROP TABLE users--",
            "1' UNION SELECT null, username, password FROM users--",
            "admin'--",
            "' OR 1=1--",
            "1' AND '1'='1",
            "' UNION ALL SELECT NULL--",
            "' AND SLEEP(5)--",
        ]
        
        results = {
            'function': db_function.__name__,
            'vulnerable': False,
            'payloads_tested': len(sql_injection_payloads),
            'failed_payloads': []
        }
        
        for payload in sql_injection_payloads:
            try:
                # Test with payload
                result = db_function(payload)
                
                # Check if payload was executed (simplified check)
                if isinstance(result, list) and len(result) > 0:
                    # Might be vulnerable if returns results with injection
                    results['vulnerable'] = True
                    results['failed_payloads'].append(payload)
                    
            except Exception as e:
                # Good - function rejected the payload
                pass
        
        self.test_results.append(results)
        return results
    
    def test_xss_protection(self, render_function, test_inputs: List[str]):
        """Test function against XSS attempts"""
        xss_payloads = [
            "<script>alert('XSS')</script>",
            "<img src=x onerror='alert(1)'>",
            "<svg onload='alert(1)'>",
            "javascript:alert('XSS')",
            "<iframe src='javascript:alert(1)'>",
            "<input onfocus='alert(1)' autofocus>",
            "<select onfocus='alert(1)' autofocus>",
            "<textarea onfocus='alert(1)' autofocus>",
            "<keygen onfocus='alert(1)' autofocus>",
            "<video><source onerror='alert(1)'>",
        ]
        
        results = {
            'function': render_function.__name__,
            'vulnerable': False,
            'payloads_tested': len(xss_payloads),
            'unescaped_payloads': []
        }
        
        for payload in xss_payloads:
            try:
                # Test with payload
                output = render_function(payload)
                
                # Check if payload appears unescaped
                if payload in output or 'alert(' in output:
                    results['vulnerable'] = True
                    results['unescaped_payloads'].append(payload)
                    
            except Exception:
                # Function might reject the input
                pass
        
        self.test_results.append(results)
        return results
    
    def test_authentication_bypass(self, auth_function):
        """Test authentication function for bypass vulnerabilities"""
        bypass_attempts = [
            {'username': 'admin', 'password': "' OR '1'='1"},
            {'username': "admin'--", 'password': 'anything'},
            {'username': 'admin', 'password': None},
            {'username': None, 'password': None},
            {'username': '', 'password': ''},
            {'username': 'admin\x00', 'password': 'wrong'},
            {'username': 'admin', 'password': {'$ne': 'wrong'}},  # NoSQL
        ]
        
        results = {
            'function': auth_function.__name__,
            'vulnerable': False,
            'bypass_attempts': len(bypass_attempts),
            'successful_bypasses': []
        }
        
        for attempt in bypass_attempts:
            try:
                # Test authentication
                result = auth_function(
                    attempt.get('username'),
                    attempt.get('password')
                )
                
                # If returns True or user object, might be vulnerable
                if result:
                    results['vulnerable'] = True
                    results['successful_bypasses'].append(attempt)
                    
            except Exception:
                # Good - function rejected invalid input
                pass
        
        self.test_results.append(results)
        return results
// JavaScript - Security Testing and Code Analysis
const fs = require('fs').promises;
const path = require('path');
const parser = require('@babel/parser');
const traverse = require('@babel/traverse').default;
const { execSync } = require('child_process');

class SecurityFinding {
    constructor(type, severity, filePath, line, snippet, description, recommendation, cwe) {
        this.vulnerabilityType = type;
        this.severity = severity;
        this.filePath = filePath;
        this.line = line;
        this.codeSnippet = snippet;
        this.description = description;
        this.recommendation = recommendation;
        this.cweId = cwe;
    }
}

class JavaScriptSecurityAnalyzer {
    constructor(projectPath) {
        this.projectPath = projectPath;
        this.findings = [];
        this.dangerousPatterns = this.loadDangerousPatterns();
        this.domSinks = this.loadDOMSinks();
    }
    
    loadDangerousPatterns() {
        return {
            eval: {
                pattern: /eval\s*\(/,
                type: 'code_injection',
                severity: 'critical',
                description: 'Use of eval() can lead to code injection',
                recommendation: 'Avoid eval() and use safer alternatives',
                cwe: 'CWE-95'
            },
            innerHTML: {
                pattern: /\.innerHTML\s*=/,
                type: 'xss',
                severity: 'high',
                description: 'Direct innerHTML assignment can lead to XSS',
                recommendation: 'Use textContent or sanitize HTML',
                cwe: 'CWE-79'
            },
            documentWrite: {
                pattern: /document\.write\s*\(/,
                type: 'xss',
                severity: 'high',
                description: 'document.write can lead to XSS',
                recommendation: 'Use DOM methods instead',
                cwe: 'CWE-79'
            },
            setTimeout: {
                pattern: /setTimeout\s*\(\s*["'].*["']/,
                type: 'code_injection',
                severity: 'medium',
                description: 'String argument to setTimeout',
                recommendation: 'Use function reference instead',
                cwe: 'CWE-95'
            },
            SQLInjection: {
                pattern: /query\s*\(\s*['"`].*\$\{.*\}.*['"`]/,
                type: 'sql_injection',
                severity: 'critical',
                description: 'SQL query uses template literals',
                recommendation: 'Use parameterized queries',
                cwe: 'CWE-89'
            }
        };
    }
    
    loadDOMSinks() {
        // Dangerous DOM properties and methods
        return {
            properties: [
                'innerHTML', 'outerHTML', 'insertAdjacentHTML',
                'document.write', 'document.writeln'
            ],
            methods: [
                'eval', 'setTimeout', 'setInterval', 'Function',
                'execScript', 'setImmediate'
            ],
            jQuery: [
                'html', 'append', 'prepend', 'after', 'before',
                'replaceWith', 'replaceAll'
            ]
        };
    }
    
    async analyzeProject() {
        this.findings = [];
        
        // Get all JavaScript/TypeScript files
        const files = await this.getAllJSFiles(this.projectPath);
        
        for (const file of files) {
            if (file.includes('node_modules') || file.includes('test')) {
                continue;
            }
            
            await this.analyzeFile(file);
        }
        
        // Run additional security tools
        await this.runESLintSecurity();
        await this.runNpmAudit();
        await this.runRetireJS();
        
        return this.findings;
    }
    
    async analyzeFile(filePath) {
        try {
            const content = await fs.readFile(filePath, 'utf8');
            
            // Parse AST
            const ast = parser.parse(content, {
                sourceType: 'module',
                plugins: ['jsx', 'typescript']
            });
            
            // Run analyzers
            this.analyzeCodeInjection(ast, filePath, content);
            this.analyzeXSS(ast, filePath, content);
            this.analyzeSQLInjection(ast, filePath, content);
            this.analyzeInsecureRandomness(ast, filePath, content);
            this.analyzeHardcodedSecrets(content, filePath);
            
        } catch (error) {
            console.error(`Error analyzing ${filePath}:`, error);
        }
    }
    
    analyzeCodeInjection(ast, filePath, content) {
        const self = this;
        
        traverse(ast, {
            CallExpression(path) {
                const node = path.node;
                
                // Check for eval()
                if (node.callee.name === 'eval') {
                    self.findings.push(new SecurityFinding(
                        'code_injection',
                        'critical',
                        filePath,
                        node.loc.start.line,
                        self.getCodeSnippet(content, node.loc),
                        'Use of eval() can execute arbitrary code',
                        'Avoid eval() and use JSON.parse() or other safe alternatives',
                        'CWE-95'
                    ));
                }
                
                // Check for Function constructor
                if (node.callee.name === 'Function' ||
                    (node.callee.type === 'MemberExpression' &&
                     node.callee.object.name === 'window' &&
                     node.callee.property.name === 'Function')) {
                    self.findings.push(new SecurityFinding(
                        'code_injection',
                        'critical',
                        filePath,
                        node.loc.start.line,
                        self.getCodeSnippet(content, node.loc),
                        'Function constructor can execute arbitrary code',
                        'Avoid Function constructor',
                        'CWE-95'
                    ));
                }
                
                // Check setTimeout/setInterval with strings
                if (['setTimeout', 'setInterval'].includes(node.callee.name)) {
                    if (node.arguments[0] && node.arguments[0].type === 'StringLiteral') {
                        self.findings.push(new SecurityFinding(
                            'code_injection',
                            'high',
                            filePath,
                            node.loc.start.line,
                            self.getCodeSnippet(content, node.loc),
                            `String argument to ${node.callee.name} can execute code`,
                            'Use function reference instead of string',
                            'CWE-95'
                        ));
                    }
                }
            }
        });
    }
    
    analyzeXSS(ast, filePath, content) {
        const self = this;
        
        traverse(ast, {
            AssignmentExpression(path) {
                const node = path.node;
                
                // Check for innerHTML assignment
                if (node.left.type === 'MemberExpression' &&
                    node.left.property.name === 'innerHTML') {
                    
                    // Check if right side contains user input
                    if (self.containsUserInput(node.right)) {
                        self.findings.push(new SecurityFinding(
                            'xss',
                            'critical',
                            filePath,
                            node.loc.start.line,
                            self.getCodeSnippet(content, node.loc),
                            'innerHTML assignment with user input can lead to XSS',
                            'Use textContent or sanitize HTML with DOMPurify',
                            'CWE-79'
                        ));
                    }
                }
            },
            
            CallExpression(path) {
                const node = path.node;
                
                // Check for document.write
                if (node.callee.type === 'MemberExpression' &&
                    node.callee.object.name === 'document' &&
                    node.callee.property.name === 'write') {
                    
                    self.findings.push(new SecurityFinding(
                        'xss',
                        'high',
                        filePath,
                        node.loc.start.line,
                        self.getCodeSnippet(content, node.loc),
                        'document.write can lead to XSS vulnerabilities',
                        'Use createElement and appendChild instead',
                        'CWE-79'
                    ));
                }
                
                // Check jQuery methods
                if (node.callee.type === 'MemberExpression' &&
                    self.domSinks.jQuery.includes(node.callee.property.name)) {
                    
                    if (self.containsUserInput(node.arguments[0])) {
                        self.findings.push(new SecurityFinding(
                            'xss',
                            'high',
                            filePath,
                            node.loc.start.line,
                            self.getCodeSnippet(content, node.loc),
                            `jQuery ${node.callee.property.name}() with user input`,
                            'Sanitize HTML or use text() method',
                            'CWE-79'
                        ));
                    }
                }
            }
        });
    }
    
    analyzeSQLInjection(ast, filePath, content) {
        const self = this;
        
        traverse(ast, {
            CallExpression(path) {
                const node = path.node;
                
                // Look for database query methods
                const queryMethods = ['query', 'execute', 'exec', 'run'];
                
                if (node.callee.type === 'MemberExpression' &&
                    queryMethods.includes(node.callee.property.name)) {
                    
                    // Check first argument (query string)
                    const queryArg = node.arguments[0];
                    
                    if (queryArg) {
                        // Check for template literals with expressions
                        if (queryArg.type === 'TemplateLiteral' &&
                            queryArg.expressions.length > 0) {
                            
                            self.findings.push(new SecurityFinding(
                                'sql_injection',
                                'critical',
                                filePath,
                                node.loc.start.line,
                                self.getCodeSnippet(content, node.loc),
                                'SQL query uses template literal with expressions',
                                'Use parameterized queries or prepared statements',
                                'CWE-89'
                            ));
                        }
                        
                        // Check for string concatenation
                        if (queryArg.type === 'BinaryExpression' &&
                            queryArg.operator === '+') {
                            
                            self.findings.push(new SecurityFinding(
                                'sql_injection',
                                'critical',
                                filePath,
                                node.loc.start.line,
                                self.getCodeSnippet(content, node.loc),
                                'SQL query uses string concatenation',
                                'Use parameterized queries or prepared statements',
                                'CWE-89'
                            ));
                        }
                    }
                }
            }
        });
    }
    
    analyzeInsecureRandomness(ast, filePath, content) {
        const self = this;
        
        traverse(ast, {
            CallExpression(path) {
                const node = path.node;
                
                // Check for Math.random() in security contexts
                if (node.callee.type === 'MemberExpression' &&
                    node.callee.object.name === 'Math' &&
                    node.callee.property.name === 'random') {
                    
                    // Try to determine context
                    const parent = path.parent;
                    const context = self.getContext(path);
                    
                    if (context.includes('token') || 
                        context.includes('password') ||
                        context.includes('key') ||
                        context.includes('secret')) {
                        
                        self.findings.push(new SecurityFinding(
                            'insecure_random',
                            'high',
                            filePath,
                            node.loc.start.line,
                            self.getCodeSnippet(content, node.loc),
                            'Math.random() is not cryptographically secure',
                            'Use crypto.randomBytes() for security-sensitive randomness',
                            'CWE-338'
                        ));
                    }
                }
            }
        });
    }
    
    analyzeHardcodedSecrets(content, filePath) {
        const secretPatterns = [
            {
                pattern: /["']?password["']?\s*[:=]\s*["'][^"']+["']/gi,
                type: 'password'
            },
            {
                pattern: /["']?api[_-]?key["']?\s*[:=]\s*["'][^"']+["']/gi,
                type: 'API key'
            },
            {
                pattern: /["']?secret["']?\s*[:=]\s*["'][^"']+["']/gi,
                type: 'secret'
            },
            {
                pattern: /["']?token["']?\s*[:=]\s*["'][^"']+["']/gi,
                type: 'token'
            },
            {
                pattern: /mongodb:\/\/[^:]+:[^@]+@/gi,
                type: 'MongoDB connection string'
            },
            {
                pattern: /postgres:\/\/[^:]+:[^@]+@/gi,
                type: 'PostgreSQL connection string'
            }
        ];
        
        const lines = content.split('\n');
        
        lines.forEach((line, index) => {
            // Skip comments and obvious non-secrets
            if (line.trim().startsWith('//') || 
                line.includes('process.env') ||
                line.includes('example') ||
                line.includes('placeholder')) {
                return;
            }
            
            secretPatterns.forEach(({ pattern, type }) => {
                if (pattern.test(line)) {
                    this.findings.push(new SecurityFinding(
                        'hardcoded_secret',
                        'high',
                        filePath,
                        index + 1,
                        line.trim(),
                        `Hardcoded ${type} detected`,
                        'Use environment variables or secure key management',
                        'CWE-798'
                    ));
                }
            });
        });
    }
    
    containsUserInput(node) {
        if (!node) return false;
        
        // Check for common user input sources
        const userInputSources = [
            'req.body', 'req.query', 'req.params',
            'request.body', 'request.query', 'request.params',
            'location.search', 'location.hash',
            'window.location', 'document.location',
            'localStorage', 'sessionStorage',
            'document.cookie'
        ];
        
        // Convert node to string for simple check
        const nodeStr = this.nodeToString(node);
        
        return userInputSources.some(source => nodeStr.includes(source));
    }
    
    nodeToString(node) {
        // Simple node to string conversion
        if (node.type === 'Identifier') {
            return node.name;
        } else if (node.type === 'MemberExpression') {
            return `${this.nodeToString(node.object)}.${node.property.name}`;
        } else if (node.type === 'StringLiteral') {
            return node.value;
        }
        return '';
    }
    
    getContext(path) {
        // Get surrounding context to understand usage
        let context = '';
        let current = path;
        
        for (let i = 0; i < 3 && current.parent; i++) {
            if (current.parent.type === 'VariableDeclarator') {
                context += current.parent.id.name + ' ';
            } else if (current.parent.type === 'AssignmentExpression') {
                context += this.nodeToString(current.parent.left) + ' ';
            }
            current = current.parentPath;
        }
        
        return context.toLowerCase();
    }
    
    getCodeSnippet(content, loc) {
        const lines = content.split('\n');
        const startLine = Math.max(0, loc.start.line - 2);
        const endLine = Math.min(lines.length, loc.end.line + 1);
        
        return lines.slice(startLine, endLine).join('\n');
    }
    
    async getAllJSFiles(dir, files = []) {
        const entries = await fs.readdir(dir, { withFileTypes: true });
        
        for (const entry of entries) {
            const fullPath = path.join(dir, entry.name);
            
            if (entry.isDirectory() && 
                !entry.name.startsWith('.') &&
                entry.name !== 'node_modules') {
                await this.getAllJSFiles(fullPath, files);
            } else if (entry.isFile() && 
                      (entry.name.endsWith('.js') || 
                       entry.name.endsWith('.jsx') ||
                       entry.name.endsWith('.ts') ||
                       entry.name.endsWith('.tsx'))) {
                files.push(fullPath);
            }
        }
        
        return files;
    }
    
    async runESLintSecurity() {
        try {
            // Run ESLint with security plugin
            const result = execSync(
                `npx eslint ${this.projectPath} --plugin security --format json`,
                { encoding: 'utf8', stdio: 'pipe' }
            );
            
            const eslintResults = JSON.parse(result);
            
            // Process ESLint findings
            for (const file of eslintResults) {
                for (const message of file.messages) {
                    if (message.ruleId && message.ruleId.startsWith('security/')) {
                        this.findings.push(new SecurityFinding(
                            this.mapESLintRule(message.ruleId),
                            message.severity === 2 ? 'high' : 'medium',
                            file.filePath,
                            message.line,
                            message.source,
                            message.message,
                            'Follow ESLint security recommendations',
                            'CWE-' + (message.ruleId.split('/')[1] || 'unknown')
                        ));
                    }
                }
            }
        } catch (error) {
            console.error('ESLint security check failed:', error.message);
        }
    }
    
    mapESLintRule(ruleId) {
        const mapping = {
            'security/detect-eval-with-expression': 'code_injection',
            'security/detect-non-literal-regexp': 'regex_injection',
            'security/detect-non-literal-require': 'code_injection',
            'security/detect-object-injection': 'injection',
            'security/detect-possible-timing-attack': 'timing_attack',
            'security/detect-pseudoRandomBytes': 'insecure_random',
            'security/detect-unsafe-regex': 'redos',
            'security/detect-child-process': 'command_injection'
        };
        
        return mapping[ruleId] || 'security_issue';
    }
}

// Security-focused testing framework
class SecurityTestRunner {
    constructor() {
        this.results = [];
    }
    
    async testXSSProtection(renderFunction) {
        const xssPayloads = [
            '<script>alert("XSS")</script>',
            '<img src=x onerror=alert(1)>',
            '<svg onload=alert(1)>',
            'javascript:alert(1)',
            '<iframe src=javascript:alert(1)>',
            '<body onload=alert(1)>',
            '<input autofocus onfocus=alert(1)>',
            '<select autofocus onfocus=alert(1)>',
            '<textarea autofocus onfocus=alert(1)>',
            '<keygen autofocus onfocus=alert(1)>',
            '<video><source onerror=alert(1)>',
            '<audio src=x onerror=alert(1)>',
            '<details open ontoggle=alert(1)>',
            '<marquee onstart=alert(1)>'
        ];
        
        const results = {
            function: renderFunction.name,
            vulnerable: false,
            testedPayloads: xssPayloads.length,
            failedPayloads: []
        };
        
        for (const payload of xssPayloads) {
            try {
                const output = await renderFunction(payload);
                
                // Check if payload is present unescaped
                if (output.includes(payload) || 
                    output.includes('alert(') ||
                    output.includes('onerror=')) {
                    results.vulnerable = true;
                    results.failedPayloads.push({
                        payload,
                        output: output.substring(0, 100)
                    });
                }
            } catch (error) {
                // Error handling
            }
        }
        
        this.results.push(results);
        return results;
    }
    
    async testSQLInjection(queryFunction) {
        const sqlPayloads = [
            "' OR '1'='1",
            "'; DROP TABLE users--",
            "' UNION SELECT * FROM users--",
            "admin'--",
            "' OR 1=1--",
            "1'; WAITFOR DELAY '00:00:05'--",
            "' AND (SELECT * FROM users)--",
            "' OR EXISTS(SELECT * FROM users WHERE username='admin')--"
        ];
        
        const results = {
            function: queryFunction.name,
            vulnerable: false,
            testedPayloads: sqlPayloads.length,
            failedPayloads: []
        };
        
        for (const payload of sqlPayloads) {
            try {
                const result = await queryFunction(payload);
                
                // Check for signs of injection
                if (Array.isArray(result) && result.length > 0) {
                    // Might be vulnerable if returns unexpected results
                    results.vulnerable = true;
                    results.failedPayloads.push(payload);
                }
            } catch (error) {
                // Good - query failed
                if (!error.message.includes('syntax')) {
                    // But if not syntax error, might still be vulnerable
                    results.errors = results.errors || [];
                    results.errors.push({
                        payload,
                        error: error.message
                    });
                }
            }
        }
        
        this.results.push(results);
        return results;
    }
    
    async testAuthenticationBypass(authFunction) {
        const bypassPayloads = [
            { username: 'admin', password: "' OR '1'='1" },
            { username: "admin'--", password: 'anything' },
            { username: 'admin', password: null },
            { username: null, password: null },
            { username: '', password: '' },
            { username: 'admin\x00', password: 'wrong' },
            { username: 'admin', password: { $ne: null } }, // NoSQL
            { username: { $gt: '' }, password: { $gt: '' } }, // NoSQL
            { username: 'admin', password: undefined },
            { username: ['admin'], password: 'wrong' } // Type confusion
        ];
        
        const results = {
            function: authFunction.name,
            vulnerable: false,
            testedPayloads: bypassPayloads.length,
            successfulBypasses: []
        };
        
        for (const payload of bypassPayloads) {
            try {
                const result = await authFunction(
                    payload.username,
                    payload.password
                );
                
                // If returns truthy value, authentication was bypassed
                if (result) {
                    results.vulnerable = true;
                    results.successfulBypasses.push(payload);
                }
            } catch (error) {
                // Good - authentication failed
            }
        }
        
        this.results.push(results);
        return results;
    }
}

Implementing comprehensive security testing and code analysis is essential for identifying vulnerabilities before they reach production. By combining static analysis, dynamic testing, and security-focused unit tests, development teams can build more secure applications while maintaining development velocity. Regular security testing should be integrated into CI/CD pipelines to ensure ongoing protection.## Security in Development Workflow

Integrating security into the development workflow transforms security from a gate at the end of development to an integral part of the entire software development lifecycle. This DevSecOps approach ensures that security considerations are addressed early and continuously, reducing both the cost and impact of vulnerabilities. This chapter explores comprehensive strategies for embedding security practices into every stage of development, from initial design through deployment and maintenance.