Secure Dependency Management Practices

Secure Dependency Management Practices

Managing dependencies securely requires a comprehensive approach that includes careful package selection, continuous monitoring, and automated security checks. Both Python and JavaScript ecosystems provide tools and best practices for dependency management, but developers must actively implement these measures to maintain security.

# Python - Secure Dependency Management
import subprocess
import json
import hashlib
import requests
from typing import Dict, List, Set, Optional
from datetime import datetime, timedelta
import toml
import pkg_resources
from packaging import version
import ast
import os

class PythonDependencyManager:
    def __init__(self, project_path: str):
        self.project_path = project_path
        self.requirements_file = os.path.join(project_path, 'requirements.txt')
        self.pipfile = os.path.join(project_path, 'Pipfile')
        self.pyproject_toml = os.path.join(project_path, 'pyproject.toml')
        
    def audit_dependencies(self) -> Dict[str, List[Dict]]:
        """Comprehensive dependency security audit"""
        vulnerabilities = {
            'critical': [],
            'high': [],
            'medium': [],
            'low': []
        }
        
        # Run pip-audit for vulnerability scanning
        try:
            result = subprocess.run(
                ['pip-audit', '--format', 'json', '--desc'],
                capture_output=True,
                text=True,
                cwd=self.project_path
            )
            
            if result.returncode == 0:
                audit_data = json.loads(result.stdout)
                for vuln in audit_data:
                    severity = self._classify_severity(vuln.get('fix_versions', []))
                    vulnerabilities[severity].append({
                        'package': vuln['name'],
                        'installed_version': vuln['version'],
                        'vulnerability': vuln['vulnerability'],
                        'description': vuln.get('description', ''),
                        'fix_versions': vuln.get('fix_versions', [])
                    })
        except subprocess.CalledProcessError as e:
            print(f"pip-audit failed: {e}")
        
        # Check for outdated packages
        outdated = self._check_outdated_packages()
        vulnerabilities['outdated'] = outdated
        
        # Verify package integrity
        integrity_issues = self._verify_package_integrity()
        vulnerabilities['integrity'] = integrity_issues
        
        return vulnerabilities
    
    def _check_outdated_packages(self) -> List[Dict]:
        """Check for outdated packages that might have security fixes"""
        outdated = []
        
        try:
            result = subprocess.run(
                ['pip', 'list', '--outdated', '--format', 'json'],
                capture_output=True,
                text=True
            )
            
            if result.returncode == 0:
                packages = json.loads(result.stdout)
                for pkg in packages:
                    # Check if update is a security release
                    if self._is_security_update(pkg['name'], 
                                              pkg['version'], 
                                              pkg['latest_version']):
                        outdated.append({
                            'package': pkg['name'],
                            'current': pkg['version'],
                            'latest': pkg['latest_version'],
                            'type': pkg.get('latest_filetype', 'wheel')
                        })
        except Exception as e:
            print(f"Failed to check outdated packages: {e}")
        
        return outdated
    
    def _verify_package_integrity(self) -> List[Dict]:
        """Verify installed packages haven't been tampered with"""
        issues = []
        
        for dist in pkg_resources.working_set:
            try:
                # Get package metadata
                metadata = dist.get_metadata('RECORD')
                
                for line in metadata.splitlines():
                    if not line.strip():
                        continue
                    
                    parts = line.split(',')
                    if len(parts) >= 3:
                        file_path, expected_hash, size = parts[0], parts[1], parts[2]
                        
                        # Verify file hash
                        full_path = os.path.join(dist.location, file_path)
                        if os.path.exists(full_path):
                            actual_hash = self._calculate_file_hash(full_path)
                            if expected_hash and actual_hash != expected_hash:
                                issues.append({
                                    'package': dist.project_name,
                                    'file': file_path,
                                    'expected_hash': expected_hash,
                                    'actual_hash': actual_hash
                                })
            except Exception:
                # Skip packages without RECORD files
                pass
        
        return issues
    
    def _calculate_file_hash(self, file_path: str) -> str:
        """Calculate SHA256 hash of a file"""
        sha256_hash = hashlib.sha256()
        with open(file_path, "rb") as f:
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
        return f"sha256={sha256_hash.hexdigest()}"
    
    def generate_requirements_lock(self) -> str:
        """Generate locked requirements with hashes"""
        locked_requirements = []
        
        for dist in pkg_resources.working_set:
            # Get exact version with hash
            requirement = f"{dist.project_name}=={dist.version}"
            
            # Add hash if available
            try:
                # Get the wheel or sdist file
                if hasattr(dist, '_provider'):
                    provider = dist._provider
                    if hasattr(provider, 'egg_info'):
                        egg_info = provider.egg_info
                        record_path = os.path.join(egg_info, 'RECORD')
                        if os.path.exists(record_path):
                            requirement += f" \\\n    --hash=sha256:{self._get_dist_hash(dist)}"
            except Exception:
                pass
            
            locked_requirements.append(requirement)
        
        return '\n'.join(sorted(locked_requirements))
    
    def check_package_permissions(self, package_name: str) -> Dict[str, any]:
        """Analyze what permissions a package requests"""
        permissions = {
            'network_access': False,
            'file_system_access': False,
            'subprocess_execution': False,
            'dynamic_imports': False,
            'suspicious_patterns': []
        }
        
        try:
            # Get package location
            dist = pkg_resources.get_distribution(package_name)
            package_location = dist.location
            
            # Scan Python files for suspicious patterns
            for root, dirs, files in os.walk(package_location):
                for file in files:
                    if file.endswith('.py'):
                        file_path = os.path.join(root, file)
                        permissions = self._analyze_python_file(file_path, permissions)
        
        except Exception as e:
            permissions['error'] = str(e)
        
        return permissions
    
    def _analyze_python_file(self, file_path: str, 
                           permissions: Dict) -> Dict:
        """Analyze Python file for security-relevant patterns"""
        dangerous_imports = {
            'socket': 'network_access',
            'urllib': 'network_access',
            'requests': 'network_access',
            'subprocess': 'subprocess_execution',
            'os': 'file_system_access',
            'eval': 'dynamic_execution',
            'exec': 'dynamic_execution',
            '__import__': 'dynamic_imports'
        }
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                tree = ast.parse(content)
                
                for node in ast.walk(tree):
                    # Check imports
                    if isinstance(node, ast.Import):
                        for alias in node.names:
                            if alias.name in dangerous_imports:
                                perm_type = dangerous_imports[alias.name]
                                if perm_type in permissions:
                                    permissions[perm_type] = True
                    
                    # Check function calls
                    elif isinstance(node, ast.Call):
                        if isinstance(node.func, ast.Name):
                            func_name = node.func.id
                            if func_name in ['eval', 'exec']:
                                permissions['suspicious_patterns'].append(
                                    f"Dynamic code execution: {func_name}"
                                )
                    
                    # Check for obfuscated code
                    elif isinstance(node, ast.Str):
                        if 'eval' in node.s or 'exec' in node.s:
                            permissions['suspicious_patterns'].append(
                                "Possible obfuscated code execution"
                            )
        
        except Exception:
            pass
        
        return permissions

class DependencyPinner:
    """Ensure all dependencies are pinned to specific versions"""
    
    def __init__(self, requirements_file: str):
        self.requirements_file = requirements_file
        
    def pin_all_dependencies(self):
        """Pin all dependencies to exact versions"""
        pinned_requirements = []
        
        # Parse current requirements
        with open(self.requirements_file, 'r') as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith('#'):
                    pinned_requirements.append(line)
                    continue
                
                # Parse requirement
                if '==' in line:
                    # Already pinned
                    pinned_requirements.append(line)
                else:
                    # Get installed version
                    package_name = line.split('>=')[0].split('<=')[0].split('~=')[0].strip()
                    try:
                        dist = pkg_resources.get_distribution(package_name)
                        pinned_requirements.append(f"{package_name}=={dist.version}")
                    except Exception:
                        pinned_requirements.append(line)
        
        # Write back
        with open(self.requirements_file, 'w') as f:
            f.write('\n'.join(pinned_requirements))

class PrivatePackageRegistry:
    """Manage private package registry for internal packages"""
    
    def __init__(self, registry_url: str, auth_token: str):
        self.registry_url = registry_url
        self.auth_token = auth_token
        
    def configure_pip(self):
        """Configure pip to use private registry"""
        pip_conf = f"""
[global]
index-url = {self.registry_url}/simple/
trusted-host = {self.registry_url.split('//')[1].split('/')[0]}
extra-index-url = https://pypi.org/simple/

[install]
find-links = {self.registry_url}/packages/
"""
        
        # Write pip configuration
        pip_dir = os.path.expanduser('~/.pip')
        os.makedirs(pip_dir, exist_ok=True)
        
        with open(os.path.join(pip_dir, 'pip.conf'), 'w') as f:
            f.write(pip_conf)
    
    def upload_package(self, package_path: str):
        """Upload package to private registry"""
        import twine.commands.upload
        
        # Configure twine
        settings = twine.settings.Settings(
            repository_url=f"{self.registry_url}/legacy/",
            username="__token__",
            password=self.auth_token
        )
        
        # Upload package
        twine.commands.upload.upload(
            settings,
            [package_path]
        )
    
    def verify_package_source(self, package_name: str) -> bool:
        """Verify package comes from private registry"""
        try:
            dist = pkg_resources.get_distribution(package_name)
            # Check if installed from private registry
            return self.registry_url in dist.location
        except Exception:
            return False
// JavaScript - Secure Dependency Management
const fs = require('fs').promises;
const path = require('path');
const crypto = require('crypto');
const { exec } = require('child_process');
const { promisify } = require('util');
const execAsync = promisify(exec);

class NodeDependencyManager {
    constructor(projectPath) {
        this.projectPath = projectPath;
        this.packageJsonPath = path.join(projectPath, 'package.json');
        this.packageLockPath = path.join(projectPath, 'package-lock.json');
        this.nodeModulesPath = path.join(projectPath, 'node_modules');
    }
    
    async auditDependencies() {
        const vulnerabilities = {
            critical: [],
            high: [],
            moderate: [],
            low: [],
            info: []
        };
        
        try {
            // Run npm audit
            const { stdout } = await execAsync('npm audit --json', {
                cwd: this.projectPath
            });
            
            const auditData = JSON.parse(stdout);
            
            // Process vulnerabilities
            if (auditData.vulnerabilities) {
                for (const [pkgName, vuln] of Object.entries(auditData.vulnerabilities)) {
                    const severity = vuln.severity;
                    
                    vulnerabilities[severity].push({
                        package: pkgName,
                        severity: vuln.severity,
                        vulnerableVersions: vuln.range,
                        recommendation: vuln.fixAvailable,
                        paths: vuln.nodes
                    });
                }
            }
            
            // Check for suspicious packages
            const suspiciousPackages = await this.detectSuspiciousPackages();
            vulnerabilities.suspicious = suspiciousPackages;
            
            // Verify package integrity
            const integrityIssues = await this.verifyPackageIntegrity();
            vulnerabilities.integrity = integrityIssues;
            
        } catch (error) {
            console.error('Audit failed:', error);
        }
        
        return vulnerabilities;
    }
    
    async detectSuspiciousPackages() {
        const suspicious = [];
        const packageJson = JSON.parse(
            await fs.readFile(this.packageJsonPath, 'utf8')
        );
        
        const allDependencies = {
            ...packageJson.dependencies,
            ...packageJson.devDependencies
        };
        
        for (const [name, version] of Object.entries(allDependencies)) {
            // Check for typosquatting
            if (this.isPossibleTyposquat(name)) {
                suspicious.push({
                    package: name,
                    reason: 'Possible typosquatting',
                    similarTo: this.findSimilarPopularPackage(name)
                });
            }
            
            // Check for suspicious version patterns
            if (this.isSuspiciousVersion(version)) {
                suspicious.push({
                    package: name,
                    reason: 'Suspicious version pattern',
                    version
                });
            }
            
            // Check package age and downloads
            const stats = await this.getPackageStats(name);
            if (stats && this.isSuspiciousStats(stats)) {
                suspicious.push({
                    package: name,
                    reason: 'Low downloads or recently published',
                    stats
                });
            }
        }
        
        // Scan for suspicious patterns in installed packages
        const installedSuspicious = await this.scanInstalledPackages();
        suspicious.push(...installedSuspicious);
        
        return suspicious;
    }
    
    isPossibleTyposquat(packageName) {
        const popularPackages = [
            'express', 'react', 'lodash', 'axios', 'moment',
            'webpack', 'babel', 'typescript', 'jest', 'eslint'
        ];
        
        // Check for common typosquatting patterns
        for (const popular of popularPackages) {
            // Character substitution
            const distance = this.levenshteinDistance(packageName, popular);
            if (distance === 1) {
                return true;
            }
            
            // Common misspellings
            const variations = [
                popular.replace(/s$/, 'z'),  // pluralization
                popular.replace(/z$/, 's'),
                popular + '-js',
                popular + 'js',
                popular.replace(/-/g, ''),
                popular.replace(/([a-z])([A-Z])/g, '$1-$2').toLowerCase()
            ];
            
            if (variations.includes(packageName)) {
                return true;
            }
        }
        
        return false;
    }
    
    levenshteinDistance(str1, str2) {
        const matrix = [];
        
        for (let i = 0; i <= str2.length; i++) {
            matrix[i] = [i];
        }
        
        for (let j = 0; j <= str1.length; j++) {
            matrix[0][j] = j;
        }
        
        for (let i = 1; i <= str2.length; i++) {
            for (let j = 1; j <= str1.length; j++) {
                if (str2.charAt(i - 1) === str1.charAt(j - 1)) {
                    matrix[i][j] = matrix[i - 1][j - 1];
                } else {
                    matrix[i][j] = Math.min(
                        matrix[i - 1][j - 1] + 1,
                        matrix[i][j - 1] + 1,
                        matrix[i - 1][j] + 1
                    );
                }
            }
        }
        
        return matrix[str2.length][str1.length];
    }
    
    async scanInstalledPackages() {
        const suspicious = [];
        
        try {
            const packages = await fs.readdir(this.nodeModulesPath);
            
            for (const pkg of packages) {
                if (pkg.startsWith('.') || pkg.startsWith('@')) continue;
                
                const pkgPath = path.join(this.nodeModulesPath, pkg);
                const pkgJsonPath = path.join(pkgPath, 'package.json');
                
                try {
                    const pkgJson = JSON.parse(
                        await fs.readFile(pkgJsonPath, 'utf8')
                    );
                    
                    // Check for suspicious scripts
                    if (pkgJson.scripts) {
                        const suspiciousScripts = [
                            'preinstall', 'postinstall', 'preuninstall', 'postuninstall'
                        ];
                        
                        for (const script of suspiciousScripts) {
                            if (pkgJson.scripts[script]) {
                                const scriptContent = pkgJson.scripts[script];
                                
                                // Check for suspicious patterns
                                if (this.containsSuspiciousCode(scriptContent)) {
                                    suspicious.push({
                                        package: pkg,
                                        reason: `Suspicious ${script} script`,
                                        script: scriptContent
                                    });
                                }
                            }
                        }
                    }
                    
                    // Check for unexpected network access
                    const hasNetworkCode = await this.scanForNetworkAccess(pkgPath);
                    if (hasNetworkCode && !this.isExpectedToHaveNetwork(pkg)) {
                        suspicious.push({
                            package: pkg,
                            reason: 'Unexpected network access code'
                        });
                    }
                    
                } catch (error) {
                    // Skip packages without package.json
                }
            }
        } catch (error) {
            console.error('Package scan failed:', error);
        }
        
        return suspicious;
    }
    
    containsSuspiciousCode(code) {
        const suspiciousPatterns = [
            /eval\s*\(/,
            /Function\s*\(/,
            /child_process/,
            /\.exec\s*\(/,
            /crypto\s*\.\s*createCipher/,
            /process\s*\.\s*env/,
            /require\s*\(\s*['"`]fs['"`]\s*\)/,
            /\bBuffer\s*\.\s*from\s*\(/,
            /atob\s*\(/,
            /btoa\s*\(/
        ];
        
        return suspiciousPatterns.some(pattern => pattern.test(code));
    }
    
    async scanForNetworkAccess(packagePath) {
        const networkModules = ['http', 'https', 'net', 'dgram', 'dns'];
        const networkPackages = ['axios', 'request', 'node-fetch', 'got'];
        
        try {
            // Scan all JS files
            const files = await this.getAllJSFiles(packagePath);
            
            for (const file of files) {
                const content = await fs.readFile(file, 'utf8');
                
                // Check for network module imports
                for (const module of networkModules) {
                    if (content.includes(`require('${module}')`) ||
                        content.includes(`require("${module}")`) ||
                        content.includes(`from '${module}'`) ||
                        content.includes(`from "${module}"`)) {
                        return true;
                    }
                }
                
                // Check for network package usage
                for (const pkg of networkPackages) {
                    if (content.includes(`require('${pkg}')`) ||
                        content.includes(`require("${pkg}")`)) {
                        return true;
                    }
                }
            }
        } catch (error) {
            // Ignore scan errors
        }
        
        return false;
    }
    
    async getAllJSFiles(dir, files = []) {
        const entries = await fs.readdir(dir, { withFileTypes: true });
        
        for (const entry of entries) {
            const fullPath = path.join(dir, entry.name);
            
            if (entry.isDirectory() && entry.name !== 'node_modules') {
                await this.getAllJSFiles(fullPath, files);
            } else if (entry.isFile() && entry.name.endsWith('.js')) {
                files.push(fullPath);
            }
        }
        
        return files;
    }
    
    async verifyPackageIntegrity() {
        const issues = [];
        
        try {
            // Read package-lock.json
            const lockFile = JSON.parse(
                await fs.readFile(this.packageLockPath, 'utf8')
            );
            
            // Verify each package
            for (const [name, data] of Object.entries(lockFile.dependencies || {})) {
                if (data.integrity) {
                    const packagePath = path.join(this.nodeModulesPath, name);
                    
                    try {
                        // Calculate actual integrity
                        const actualIntegrity = await this.calculatePackageIntegrity(packagePath);
                        
                        if (actualIntegrity !== data.integrity) {
                            issues.push({
                                package: name,
                                expected: data.integrity,
                                actual: actualIntegrity
                            });
                        }
                    } catch (error) {
                        // Package might not be installed
                    }
                }
            }
        } catch (error) {
            console.error('Integrity verification failed:', error);
        }
        
        return issues;
    }
    
    async generateLockFileWithIntegrity() {
        // Force regeneration of lock file with integrity hashes
        await execAsync('npm install --package-lock-only', {
            cwd: this.projectPath
        });
        
        // Add additional security metadata
        const lockFile = JSON.parse(
            await fs.readFile(this.packageLockPath, 'utf8')
        );
        
        lockFile._meta = {
            ...lockFile._meta,
            securityAudit: {
                timestamp: new Date().toISOString(),
                nodeVersion: process.version,
                npmVersion: await this.getNpmVersion()
            }
        };
        
        await fs.writeFile(
            this.packageLockPath,
            JSON.stringify(lockFile, null, 2)
        );
    }
}

class PrivateNPMRegistry {
    constructor(registryUrl, authToken) {
        this.registryUrl = registryUrl;
        this.authToken = authToken;
    }
    
    async configureNPM() {
        // Set registry configuration
        await execAsync(`npm config set registry ${this.registryUrl}`);
        
        // Set authentication
        const auth = Buffer.from(`:${this.authToken}`).toString('base64');
        await execAsync(`npm config set _auth ${auth}`);
        
        // Configure scoped packages
        await execAsync(`npm config set @mycompany:registry ${this.registryUrl}`);
        
        // Set strict SSL (should be true in production)
        await execAsync('npm config set strict-ssl true');
    }
    
    async publishPackage(packagePath) {
        // Add registry metadata to package.json
        const pkgJsonPath = path.join(packagePath, 'package.json');
        const pkgJson = JSON.parse(await fs.readFile(pkgJsonPath, 'utf8'));
        
        pkgJson.publishConfig = {
            registry: this.registryUrl
        };
        
        await fs.writeFile(pkgJsonPath, JSON.stringify(pkgJson, null, 2));
        
        // Publish to private registry
        await execAsync('npm publish', {
            cwd: packagePath,
            env: {
                ...process.env,
                NPM_TOKEN: this.authToken
            }
        });
    }
    
    async verifyPackageSource(packageName) {
        try {
            // Get package info
            const { stdout } = await execAsync(`npm view ${packageName} --json`);
            const pkgInfo = JSON.parse(stdout);
            
            // Check if from private registry
            return pkgInfo._resolved && pkgInfo._resolved.startsWith(this.registryUrl);
        } catch (error) {
            return false;
        }
    }
}

// Dependency allowlist/blocklist management
class DependencyPolicy {
    constructor() {
        this.allowlist = new Set();
        this.blocklist = new Set();
        this.policies = {};
    }
    
    async loadPolicy(policyPath) {
        const policy = JSON.parse(await fs.readFile(policyPath, 'utf8'));
        
        if (policy.allowlist) {
            policy.allowlist.forEach(pkg => this.allowlist.add(pkg));
        }
        
        if (policy.blocklist) {
            policy.blocklist.forEach(pkg => this.blocklist.add(pkg));
        }
        
        if (policy.policies) {
            Object.assign(this.policies, policy.policies);
        }
    }
    
    async enforcePolicy(projectPath) {
        const packageJson = JSON.parse(
            await fs.readFile(path.join(projectPath, 'package.json'), 'utf8')
        );
        
        const allDeps = {
            ...packageJson.dependencies,
            ...packageJson.devDependencies
        };
        
        const violations = [];
        
        for (const [name, version] of Object.entries(allDeps)) {
            // Check blocklist
            if (this.blocklist.has(name)) {
                violations.push({
                    package: name,
                    type: 'blocklisted',
                    message: `Package ${name} is blocklisted`
                });
            }
            
            // Check allowlist (if configured)
            if (this.allowlist.size > 0 && !this.allowlist.has(name)) {
                violations.push({
                    package: name,
                    type: 'not-allowlisted',
                    message: `Package ${name} is not in allowlist`
                });
            }
            
            // Check specific policies
            if (this.policies[name]) {
                const policy = this.policies[name];
                
                // Version constraints
                if (policy.allowedVersions && !policy.allowedVersions.includes(version)) {
                    violations.push({
                        package: name,
                        type: 'version-policy',
                        message: `Version ${version} not allowed for ${name}`
                    });
                }
                
                // License constraints
                if (policy.requiredLicense) {
                    const pkgInfo = await this.getPackageInfo(name);
                    if (pkgInfo.license !== policy.requiredLicense) {
                        violations.push({
                            package: name,
                            type: 'license-policy',
                            message: `License ${pkgInfo.license} not allowed for ${name}`
                        });
                    }
                }
            }
        }
        
        return violations;
    }
}

Managing dependencies securely is crucial for protecting against supply chain attacks. By implementing comprehensive vulnerability scanning, package verification, and policy enforcement, development teams can significantly reduce the risk of introducing malicious or vulnerable code through dependencies. Regular audits and automated security checks should be integrated into the development workflow to maintain ongoing security.## Secure Error Handling and Logging

Error handling and logging are critical components of application security that often receive insufficient attention. Poor error handling can leak sensitive information to attackers, while inadequate logging can leave security incidents undetected. This chapter explores comprehensive strategies for implementing secure error handling and logging in Python and JavaScript applications, ensuring that errors are managed safely while maintaining the visibility needed for security monitoring and incident response.