Secure Dependency Management Practices
Secure Dependency Management Practices
Managing dependencies securely requires a comprehensive approach that includes careful package selection, continuous monitoring, and automated security checks. Both Python and JavaScript ecosystems provide tools and best practices for dependency management, but developers must actively implement these measures to maintain security.
# Python - Secure Dependency Management
import subprocess
import json
import hashlib
import requests
from typing import Dict, List, Set, Optional
from datetime import datetime, timedelta
import toml
import pkg_resources
from packaging import version
import ast
import os
class PythonDependencyManager:
def __init__(self, project_path: str):
self.project_path = project_path
self.requirements_file = os.path.join(project_path, 'requirements.txt')
self.pipfile = os.path.join(project_path, 'Pipfile')
self.pyproject_toml = os.path.join(project_path, 'pyproject.toml')
def audit_dependencies(self) -> Dict[str, List[Dict]]:
"""Comprehensive dependency security audit"""
vulnerabilities = {
'critical': [],
'high': [],
'medium': [],
'low': []
}
# Run pip-audit for vulnerability scanning
try:
result = subprocess.run(
['pip-audit', '--format', 'json', '--desc'],
capture_output=True,
text=True,
cwd=self.project_path
)
if result.returncode == 0:
audit_data = json.loads(result.stdout)
for vuln in audit_data:
severity = self._classify_severity(vuln.get('fix_versions', []))
vulnerabilities[severity].append({
'package': vuln['name'],
'installed_version': vuln['version'],
'vulnerability': vuln['vulnerability'],
'description': vuln.get('description', ''),
'fix_versions': vuln.get('fix_versions', [])
})
except subprocess.CalledProcessError as e:
print(f"pip-audit failed: {e}")
# Check for outdated packages
outdated = self._check_outdated_packages()
vulnerabilities['outdated'] = outdated
# Verify package integrity
integrity_issues = self._verify_package_integrity()
vulnerabilities['integrity'] = integrity_issues
return vulnerabilities
def _check_outdated_packages(self) -> List[Dict]:
"""Check for outdated packages that might have security fixes"""
outdated = []
try:
result = subprocess.run(
['pip', 'list', '--outdated', '--format', 'json'],
capture_output=True,
text=True
)
if result.returncode == 0:
packages = json.loads(result.stdout)
for pkg in packages:
# Check if update is a security release
if self._is_security_update(pkg['name'],
pkg['version'],
pkg['latest_version']):
outdated.append({
'package': pkg['name'],
'current': pkg['version'],
'latest': pkg['latest_version'],
'type': pkg.get('latest_filetype', 'wheel')
})
except Exception as e:
print(f"Failed to check outdated packages: {e}")
return outdated
def _verify_package_integrity(self) -> List[Dict]:
"""Verify installed packages haven't been tampered with"""
issues = []
for dist in pkg_resources.working_set:
try:
# Get package metadata
metadata = dist.get_metadata('RECORD')
for line in metadata.splitlines():
if not line.strip():
continue
parts = line.split(',')
if len(parts) >= 3:
file_path, expected_hash, size = parts[0], parts[1], parts[2]
# Verify file hash
full_path = os.path.join(dist.location, file_path)
if os.path.exists(full_path):
actual_hash = self._calculate_file_hash(full_path)
if expected_hash and actual_hash != expected_hash:
issues.append({
'package': dist.project_name,
'file': file_path,
'expected_hash': expected_hash,
'actual_hash': actual_hash
})
except Exception:
# Skip packages without RECORD files
pass
return issues
def _calculate_file_hash(self, file_path: str) -> str:
"""Calculate SHA256 hash of a file"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return f"sha256={sha256_hash.hexdigest()}"
def generate_requirements_lock(self) -> str:
"""Generate locked requirements with hashes"""
locked_requirements = []
for dist in pkg_resources.working_set:
# Get exact version with hash
requirement = f"{dist.project_name}=={dist.version}"
# Add hash if available
try:
# Get the wheel or sdist file
if hasattr(dist, '_provider'):
provider = dist._provider
if hasattr(provider, 'egg_info'):
egg_info = provider.egg_info
record_path = os.path.join(egg_info, 'RECORD')
if os.path.exists(record_path):
requirement += f" \\\n --hash=sha256:{self._get_dist_hash(dist)}"
except Exception:
pass
locked_requirements.append(requirement)
return '\n'.join(sorted(locked_requirements))
def check_package_permissions(self, package_name: str) -> Dict[str, any]:
"""Analyze what permissions a package requests"""
permissions = {
'network_access': False,
'file_system_access': False,
'subprocess_execution': False,
'dynamic_imports': False,
'suspicious_patterns': []
}
try:
# Get package location
dist = pkg_resources.get_distribution(package_name)
package_location = dist.location
# Scan Python files for suspicious patterns
for root, dirs, files in os.walk(package_location):
for file in files:
if file.endswith('.py'):
file_path = os.path.join(root, file)
permissions = self._analyze_python_file(file_path, permissions)
except Exception as e:
permissions['error'] = str(e)
return permissions
def _analyze_python_file(self, file_path: str,
permissions: Dict) -> Dict:
"""Analyze Python file for security-relevant patterns"""
dangerous_imports = {
'socket': 'network_access',
'urllib': 'network_access',
'requests': 'network_access',
'subprocess': 'subprocess_execution',
'os': 'file_system_access',
'eval': 'dynamic_execution',
'exec': 'dynamic_execution',
'__import__': 'dynamic_imports'
}
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content)
for node in ast.walk(tree):
# Check imports
if isinstance(node, ast.Import):
for alias in node.names:
if alias.name in dangerous_imports:
perm_type = dangerous_imports[alias.name]
if perm_type in permissions:
permissions[perm_type] = True
# Check function calls
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
func_name = node.func.id
if func_name in ['eval', 'exec']:
permissions['suspicious_patterns'].append(
f"Dynamic code execution: {func_name}"
)
# Check for obfuscated code
elif isinstance(node, ast.Str):
if 'eval' in node.s or 'exec' in node.s:
permissions['suspicious_patterns'].append(
"Possible obfuscated code execution"
)
except Exception:
pass
return permissions
class DependencyPinner:
"""Ensure all dependencies are pinned to specific versions"""
def __init__(self, requirements_file: str):
self.requirements_file = requirements_file
def pin_all_dependencies(self):
"""Pin all dependencies to exact versions"""
pinned_requirements = []
# Parse current requirements
with open(self.requirements_file, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
pinned_requirements.append(line)
continue
# Parse requirement
if '==' in line:
# Already pinned
pinned_requirements.append(line)
else:
# Get installed version
package_name = line.split('>=')[0].split('<=')[0].split('~=')[0].strip()
try:
dist = pkg_resources.get_distribution(package_name)
pinned_requirements.append(f"{package_name}=={dist.version}")
except Exception:
pinned_requirements.append(line)
# Write back
with open(self.requirements_file, 'w') as f:
f.write('\n'.join(pinned_requirements))
class PrivatePackageRegistry:
"""Manage private package registry for internal packages"""
def __init__(self, registry_url: str, auth_token: str):
self.registry_url = registry_url
self.auth_token = auth_token
def configure_pip(self):
"""Configure pip to use private registry"""
pip_conf = f"""
[global]
index-url = {self.registry_url}/simple/
trusted-host = {self.registry_url.split('//')[1].split('/')[0]}
extra-index-url = https://pypi.org/simple/
[install]
find-links = {self.registry_url}/packages/
"""
# Write pip configuration
pip_dir = os.path.expanduser('~/.pip')
os.makedirs(pip_dir, exist_ok=True)
with open(os.path.join(pip_dir, 'pip.conf'), 'w') as f:
f.write(pip_conf)
def upload_package(self, package_path: str):
"""Upload package to private registry"""
import twine.commands.upload
# Configure twine
settings = twine.settings.Settings(
repository_url=f"{self.registry_url}/legacy/",
username="__token__",
password=self.auth_token
)
# Upload package
twine.commands.upload.upload(
settings,
[package_path]
)
def verify_package_source(self, package_name: str) -> bool:
"""Verify package comes from private registry"""
try:
dist = pkg_resources.get_distribution(package_name)
# Check if installed from private registry
return self.registry_url in dist.location
except Exception:
return False
// JavaScript - Secure Dependency Management
const fs = require('fs').promises;
const path = require('path');
const crypto = require('crypto');
const { exec } = require('child_process');
const { promisify } = require('util');
const execAsync = promisify(exec);
class NodeDependencyManager {
constructor(projectPath) {
this.projectPath = projectPath;
this.packageJsonPath = path.join(projectPath, 'package.json');
this.packageLockPath = path.join(projectPath, 'package-lock.json');
this.nodeModulesPath = path.join(projectPath, 'node_modules');
}
async auditDependencies() {
const vulnerabilities = {
critical: [],
high: [],
moderate: [],
low: [],
info: []
};
try {
// Run npm audit
const { stdout } = await execAsync('npm audit --json', {
cwd: this.projectPath
});
const auditData = JSON.parse(stdout);
// Process vulnerabilities
if (auditData.vulnerabilities) {
for (const [pkgName, vuln] of Object.entries(auditData.vulnerabilities)) {
const severity = vuln.severity;
vulnerabilities[severity].push({
package: pkgName,
severity: vuln.severity,
vulnerableVersions: vuln.range,
recommendation: vuln.fixAvailable,
paths: vuln.nodes
});
}
}
// Check for suspicious packages
const suspiciousPackages = await this.detectSuspiciousPackages();
vulnerabilities.suspicious = suspiciousPackages;
// Verify package integrity
const integrityIssues = await this.verifyPackageIntegrity();
vulnerabilities.integrity = integrityIssues;
} catch (error) {
console.error('Audit failed:', error);
}
return vulnerabilities;
}
async detectSuspiciousPackages() {
const suspicious = [];
const packageJson = JSON.parse(
await fs.readFile(this.packageJsonPath, 'utf8')
);
const allDependencies = {
...packageJson.dependencies,
...packageJson.devDependencies
};
for (const [name, version] of Object.entries(allDependencies)) {
// Check for typosquatting
if (this.isPossibleTyposquat(name)) {
suspicious.push({
package: name,
reason: 'Possible typosquatting',
similarTo: this.findSimilarPopularPackage(name)
});
}
// Check for suspicious version patterns
if (this.isSuspiciousVersion(version)) {
suspicious.push({
package: name,
reason: 'Suspicious version pattern',
version
});
}
// Check package age and downloads
const stats = await this.getPackageStats(name);
if (stats && this.isSuspiciousStats(stats)) {
suspicious.push({
package: name,
reason: 'Low downloads or recently published',
stats
});
}
}
// Scan for suspicious patterns in installed packages
const installedSuspicious = await this.scanInstalledPackages();
suspicious.push(...installedSuspicious);
return suspicious;
}
isPossibleTyposquat(packageName) {
const popularPackages = [
'express', 'react', 'lodash', 'axios', 'moment',
'webpack', 'babel', 'typescript', 'jest', 'eslint'
];
// Check for common typosquatting patterns
for (const popular of popularPackages) {
// Character substitution
const distance = this.levenshteinDistance(packageName, popular);
if (distance === 1) {
return true;
}
// Common misspellings
const variations = [
popular.replace(/s$/, 'z'), // pluralization
popular.replace(/z$/, 's'),
popular + '-js',
popular + 'js',
popular.replace(/-/g, ''),
popular.replace(/([a-z])([A-Z])/g, '$1-$2').toLowerCase()
];
if (variations.includes(packageName)) {
return true;
}
}
return false;
}
levenshteinDistance(str1, str2) {
const matrix = [];
for (let i = 0; i <= str2.length; i++) {
matrix[i] = [i];
}
for (let j = 0; j <= str1.length; j++) {
matrix[0][j] = j;
}
for (let i = 1; i <= str2.length; i++) {
for (let j = 1; j <= str1.length; j++) {
if (str2.charAt(i - 1) === str1.charAt(j - 1)) {
matrix[i][j] = matrix[i - 1][j - 1];
} else {
matrix[i][j] = Math.min(
matrix[i - 1][j - 1] + 1,
matrix[i][j - 1] + 1,
matrix[i - 1][j] + 1
);
}
}
}
return matrix[str2.length][str1.length];
}
async scanInstalledPackages() {
const suspicious = [];
try {
const packages = await fs.readdir(this.nodeModulesPath);
for (const pkg of packages) {
if (pkg.startsWith('.') || pkg.startsWith('@')) continue;
const pkgPath = path.join(this.nodeModulesPath, pkg);
const pkgJsonPath = path.join(pkgPath, 'package.json');
try {
const pkgJson = JSON.parse(
await fs.readFile(pkgJsonPath, 'utf8')
);
// Check for suspicious scripts
if (pkgJson.scripts) {
const suspiciousScripts = [
'preinstall', 'postinstall', 'preuninstall', 'postuninstall'
];
for (const script of suspiciousScripts) {
if (pkgJson.scripts[script]) {
const scriptContent = pkgJson.scripts[script];
// Check for suspicious patterns
if (this.containsSuspiciousCode(scriptContent)) {
suspicious.push({
package: pkg,
reason: `Suspicious ${script} script`,
script: scriptContent
});
}
}
}
}
// Check for unexpected network access
const hasNetworkCode = await this.scanForNetworkAccess(pkgPath);
if (hasNetworkCode && !this.isExpectedToHaveNetwork(pkg)) {
suspicious.push({
package: pkg,
reason: 'Unexpected network access code'
});
}
} catch (error) {
// Skip packages without package.json
}
}
} catch (error) {
console.error('Package scan failed:', error);
}
return suspicious;
}
containsSuspiciousCode(code) {
const suspiciousPatterns = [
/eval\s*\(/,
/Function\s*\(/,
/child_process/,
/\.exec\s*\(/,
/crypto\s*\.\s*createCipher/,
/process\s*\.\s*env/,
/require\s*\(\s*['"`]fs['"`]\s*\)/,
/\bBuffer\s*\.\s*from\s*\(/,
/atob\s*\(/,
/btoa\s*\(/
];
return suspiciousPatterns.some(pattern => pattern.test(code));
}
async scanForNetworkAccess(packagePath) {
const networkModules = ['http', 'https', 'net', 'dgram', 'dns'];
const networkPackages = ['axios', 'request', 'node-fetch', 'got'];
try {
// Scan all JS files
const files = await this.getAllJSFiles(packagePath);
for (const file of files) {
const content = await fs.readFile(file, 'utf8');
// Check for network module imports
for (const module of networkModules) {
if (content.includes(`require('${module}')`) ||
content.includes(`require("${module}")`) ||
content.includes(`from '${module}'`) ||
content.includes(`from "${module}"`)) {
return true;
}
}
// Check for network package usage
for (const pkg of networkPackages) {
if (content.includes(`require('${pkg}')`) ||
content.includes(`require("${pkg}")`)) {
return true;
}
}
}
} catch (error) {
// Ignore scan errors
}
return false;
}
async getAllJSFiles(dir, files = []) {
const entries = await fs.readdir(dir, { withFileTypes: true });
for (const entry of entries) {
const fullPath = path.join(dir, entry.name);
if (entry.isDirectory() && entry.name !== 'node_modules') {
await this.getAllJSFiles(fullPath, files);
} else if (entry.isFile() && entry.name.endsWith('.js')) {
files.push(fullPath);
}
}
return files;
}
async verifyPackageIntegrity() {
const issues = [];
try {
// Read package-lock.json
const lockFile = JSON.parse(
await fs.readFile(this.packageLockPath, 'utf8')
);
// Verify each package
for (const [name, data] of Object.entries(lockFile.dependencies || {})) {
if (data.integrity) {
const packagePath = path.join(this.nodeModulesPath, name);
try {
// Calculate actual integrity
const actualIntegrity = await this.calculatePackageIntegrity(packagePath);
if (actualIntegrity !== data.integrity) {
issues.push({
package: name,
expected: data.integrity,
actual: actualIntegrity
});
}
} catch (error) {
// Package might not be installed
}
}
}
} catch (error) {
console.error('Integrity verification failed:', error);
}
return issues;
}
async generateLockFileWithIntegrity() {
// Force regeneration of lock file with integrity hashes
await execAsync('npm install --package-lock-only', {
cwd: this.projectPath
});
// Add additional security metadata
const lockFile = JSON.parse(
await fs.readFile(this.packageLockPath, 'utf8')
);
lockFile._meta = {
...lockFile._meta,
securityAudit: {
timestamp: new Date().toISOString(),
nodeVersion: process.version,
npmVersion: await this.getNpmVersion()
}
};
await fs.writeFile(
this.packageLockPath,
JSON.stringify(lockFile, null, 2)
);
}
}
class PrivateNPMRegistry {
constructor(registryUrl, authToken) {
this.registryUrl = registryUrl;
this.authToken = authToken;
}
async configureNPM() {
// Set registry configuration
await execAsync(`npm config set registry ${this.registryUrl}`);
// Set authentication
const auth = Buffer.from(`:${this.authToken}`).toString('base64');
await execAsync(`npm config set _auth ${auth}`);
// Configure scoped packages
await execAsync(`npm config set @mycompany:registry ${this.registryUrl}`);
// Set strict SSL (should be true in production)
await execAsync('npm config set strict-ssl true');
}
async publishPackage(packagePath) {
// Add registry metadata to package.json
const pkgJsonPath = path.join(packagePath, 'package.json');
const pkgJson = JSON.parse(await fs.readFile(pkgJsonPath, 'utf8'));
pkgJson.publishConfig = {
registry: this.registryUrl
};
await fs.writeFile(pkgJsonPath, JSON.stringify(pkgJson, null, 2));
// Publish to private registry
await execAsync('npm publish', {
cwd: packagePath,
env: {
...process.env,
NPM_TOKEN: this.authToken
}
});
}
async verifyPackageSource(packageName) {
try {
// Get package info
const { stdout } = await execAsync(`npm view ${packageName} --json`);
const pkgInfo = JSON.parse(stdout);
// Check if from private registry
return pkgInfo._resolved && pkgInfo._resolved.startsWith(this.registryUrl);
} catch (error) {
return false;
}
}
}
// Dependency allowlist/blocklist management
class DependencyPolicy {
constructor() {
this.allowlist = new Set();
this.blocklist = new Set();
this.policies = {};
}
async loadPolicy(policyPath) {
const policy = JSON.parse(await fs.readFile(policyPath, 'utf8'));
if (policy.allowlist) {
policy.allowlist.forEach(pkg => this.allowlist.add(pkg));
}
if (policy.blocklist) {
policy.blocklist.forEach(pkg => this.blocklist.add(pkg));
}
if (policy.policies) {
Object.assign(this.policies, policy.policies);
}
}
async enforcePolicy(projectPath) {
const packageJson = JSON.parse(
await fs.readFile(path.join(projectPath, 'package.json'), 'utf8')
);
const allDeps = {
...packageJson.dependencies,
...packageJson.devDependencies
};
const violations = [];
for (const [name, version] of Object.entries(allDeps)) {
// Check blocklist
if (this.blocklist.has(name)) {
violations.push({
package: name,
type: 'blocklisted',
message: `Package ${name} is blocklisted`
});
}
// Check allowlist (if configured)
if (this.allowlist.size > 0 && !this.allowlist.has(name)) {
violations.push({
package: name,
type: 'not-allowlisted',
message: `Package ${name} is not in allowlist`
});
}
// Check specific policies
if (this.policies[name]) {
const policy = this.policies[name];
// Version constraints
if (policy.allowedVersions && !policy.allowedVersions.includes(version)) {
violations.push({
package: name,
type: 'version-policy',
message: `Version ${version} not allowed for ${name}`
});
}
// License constraints
if (policy.requiredLicense) {
const pkgInfo = await this.getPackageInfo(name);
if (pkgInfo.license !== policy.requiredLicense) {
violations.push({
package: name,
type: 'license-policy',
message: `License ${pkgInfo.license} not allowed for ${name}`
});
}
}
}
}
return violations;
}
}
Managing dependencies securely is crucial for protecting against supply chain attacks. By implementing comprehensive vulnerability scanning, package verification, and policy enforcement, development teams can significantly reduce the risk of introducing malicious or vulnerable code through dependencies. Regular audits and automated security checks should be integrated into the development workflow to maintain ongoing security.## Secure Error Handling and Logging
Error handling and logging are critical components of application security that often receive insufficient attention. Poor error handling can leak sensitive information to attackers, while inadequate logging can leave security incidents undetected. This chapter explores comprehensive strategies for implementing secure error handling and logging in Python and JavaScript applications, ensuring that errors are managed safely while maintaining the visibility needed for security monitoring and incident response.