Implementing Secure Error Handling
Implementing Secure Error Handling
Secure error handling requires different approaches for different environments. Development environments need detailed error information for debugging, while production environments must sanitize all error output. The key is implementing a consistent error handling strategy that provides useful information to developers while protecting sensitive details from potential attackers.
# Python - Secure Error Handling
import logging
import traceback
import sys
from typing import Dict, Any, Optional, Callable
from functools import wraps
import json
from datetime import datetime
import hashlib
import uuid
from enum import Enum
class ErrorSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class SecureErrorHandler:
def __init__(self, app_name: str, environment: str = "production"):
self.app_name = app_name
self.environment = environment
self.error_mappings = self._initialize_error_mappings()
self.sensitive_patterns = self._initialize_sensitive_patterns()
def _initialize_error_mappings(self) -> Dict[type, Dict[str, Any]]:
"""Map exception types to safe error messages"""
return {
ValueError: {
"message": "Invalid input provided",
"status_code": 400,
"severity": ErrorSeverity.LOW
},
KeyError: {
"message": "Required data not found",
"status_code": 400,
"severity": ErrorSeverity.LOW
},
FileNotFoundError: {
"message": "Requested resource not found",
"status_code": 404,
"severity": ErrorSeverity.LOW
},
PermissionError: {
"message": "Access denied",
"status_code": 403,
"severity": ErrorSeverity.MEDIUM
},
ConnectionError: {
"message": "Service temporarily unavailable",
"status_code": 503,
"severity": ErrorSeverity.HIGH
},
# Database errors
"DatabaseError": {
"message": "Database operation failed",
"status_code": 500,
"severity": ErrorSeverity.HIGH
},
"IntegrityError": {
"message": "Data validation failed",
"status_code": 400,
"severity": ErrorSeverity.MEDIUM
}
}
def _initialize_sensitive_patterns(self) -> list:
"""Patterns that indicate sensitive information"""
return [
r"password\s*[:=]\s*['\"]?([^'\"]+)['\"]?",
r"api[_-]?key\s*[:=]\s*['\"]?([^'\"]+)['\"]?",
r"secret\s*[:=]\s*['\"]?([^'\"]+)['\"]?",
r"token\s*[:=]\s*['\"]?([^'\"]+)['\"]?",
r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", # Credit card
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", # Email
r"/home/[^/]+/", # Unix home paths
r"C:\\Users\\[^\\]+\\", # Windows paths
r"postgres://[^@]+@", # Database URLs
r"mysql://[^@]+@"
]
def handle_exception(self, exception: Exception,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Handle exception securely based on environment"""
# Generate unique error ID for tracking
error_id = str(uuid.uuid4())
# Get safe error details
error_type = type(exception)
error_mapping = self.error_mappings.get(
error_type,
self.error_mappings.get(
exception.__class__.__name__,
{
"message": "An unexpected error occurred",
"status_code": 500,
"severity": ErrorSeverity.HIGH
}
)
)
# Build response based on environment
if self.environment == "production":
response = {
"error": error_mapping["message"],
"error_id": error_id,
"timestamp": datetime.utcnow().isoformat()
}
else:
# Development environment - include more details
response = {
"error": error_mapping["message"],
"error_id": error_id,
"error_type": error_type.__name__,
"timestamp": datetime.utcnow().isoformat(),
"details": self._sanitize_error_details(str(exception))
}
# Log full error details securely
self._log_error(error_id, exception, error_mapping["severity"], context)
return response
def _sanitize_error_details(self, error_message: str) -> str:
"""Remove sensitive information from error messages"""
import re
sanitized = error_message
# Replace sensitive patterns
for pattern in self.sensitive_patterns:
sanitized = re.sub(pattern, "[REDACTED]", sanitized, flags=re.IGNORECASE)
# Remove file paths
sanitized = re.sub(r"(/[a-zA-Z0-9._-]+)+/?", "[PATH]", sanitized)
sanitized = re.sub(r"([A-Z]:[\\/][^\"']+)", "[PATH]", sanitized)
# Remove line numbers that might reveal code structure
sanitized = re.sub(r"line \d+", "line [N]", sanitized)
return sanitized
def _log_error(self, error_id: str, exception: Exception,
severity: ErrorSeverity, context: Dict[str, Any] = None):
"""Securely log error details for debugging"""
log_entry = {
"error_id": error_id,
"timestamp": datetime.utcnow().isoformat(),
"app_name": self.app_name,
"environment": self.environment,
"severity": severity.value,
"error_type": type(exception).__name__,
"error_message": self._sanitize_error_details(str(exception)),
"context": context or {}
}
# Include stack trace for non-production environments
if self.environment != "production":
log_entry["stack_trace"] = self._sanitize_stack_trace(
traceback.format_exc()
)
else:
# In production, store hash of stack trace for deduplication
stack_trace = traceback.format_exc()
log_entry["stack_trace_hash"] = hashlib.sha256(
stack_trace.encode()
).hexdigest()[:16]
# Log to secure logging system
logging.error(json.dumps(log_entry))
def secure_error_handler(severity: ErrorSeverity = ErrorSeverity.MEDIUM):
"""Decorator for secure error handling"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
handler = SecureErrorHandler(
app_name="MyApp",
environment=os.environ.get("ENVIRONMENT", "production")
)
# Extract context from function
context = {
"function": func.__name__,
"module": func.__module__,
"args_count": len(args),
"kwargs_keys": list(kwargs.keys())
}
error_response = handler.handle_exception(e, context)
# Re-raise or return based on severity
if severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]:
raise SecureException(error_response["error"])
else:
return {"success": False, **error_response}
return wrapper
return decorator
class SecureException(Exception):
"""Base exception class for secure error handling"""
def __init__(self, message: str, error_id: str = None,
status_code: int = 500, details: Dict = None):
self.message = message
self.error_id = error_id or str(uuid.uuid4())
self.status_code = status_code
self.details = details or {}
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""Convert exception to safe dictionary representation"""
return {
"error": self.message,
"error_id": self.error_id,
"status_code": self.status_code,
"timestamp": datetime.utcnow().isoformat()
}
class SecureLogger:
"""Secure logging implementation with data sanitization"""
def __init__(self, name: str, log_level: str = "INFO"):
self.logger = logging.getLogger(name)
self.logger.setLevel(getattr(logging, log_level))
self.sensitive_fields = self._get_sensitive_fields()
self._setup_handlers()
def _get_sensitive_fields(self) -> set:
"""Fields that should be redacted in logs"""
return {
'password', 'passwd', 'pwd', 'secret', 'token',
'api_key', 'apikey', 'auth', 'authorization',
'cookie', 'session', 'credit_card', 'cc_number',
'cvv', 'ssn', 'social_security', 'tax_id'
}
def _setup_handlers(self):
"""Configure logging handlers with security filters"""
# Console handler for development
if os.environ.get("ENVIRONMENT") != "production":
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
)
self.logger.addHandler(console_handler)
# File handler with rotation
from logging.handlers import RotatingFileHandler
file_handler = RotatingFileHandler(
'logs/secure_app.log',
maxBytes=10485760, # 10MB
backupCount=5
)
file_handler.setFormatter(
logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
)
self.logger.addHandler(file_handler)
def _sanitize_data(self, data: Any) -> Any:
"""Recursively sanitize sensitive data"""
if isinstance(data, dict):
sanitized = {}
for key, value in data.items():
if any(sensitive in key.lower() for sensitive in self.sensitive_fields):
sanitized[key] = "[REDACTED]"
else:
sanitized[key] = self._sanitize_data(value)
return sanitized
elif isinstance(data, list):
return [self._sanitize_data(item) for item in data]
elif isinstance(data, str):
# Check for patterns in strings
import re
sanitized = data
# Redact credit card numbers
sanitized = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
'[CREDIT_CARD]', sanitized)
# Redact SSNs
sanitized = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', sanitized)
# Redact email addresses in sensitive contexts
if any(word in data.lower() for word in ['password', 'secret']):
sanitized = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
'[EMAIL]', sanitized)
return sanitized
else:
return data
def log(self, level: str, message: str, data: Dict[str, Any] = None,
user_id: str = None, request_id: str = None):
"""Log message with structured data"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"message": message,
"user_id": user_id,
"request_id": request_id or str(uuid.uuid4())
}
if data:
log_entry["data"] = self._sanitize_data(data)
# Add security context
log_entry["security_context"] = {
"ip_address": self._get_client_ip(),
"user_agent": self._get_user_agent(),
"session_id": self._get_session_id()
}
# Log as JSON for easier parsing
self.logger.log(
getattr(logging, level),
json.dumps(log_entry)
)
def log_security_event(self, event_type: str, details: Dict[str, Any],
severity: ErrorSeverity = ErrorSeverity.MEDIUM):
"""Log security-specific events"""
security_event = {
"event_type": event_type,
"severity": severity.value,
"details": self._sanitize_data(details),
"timestamp": datetime.utcnow().isoformat()
}
# Always log security events
self.logger.warning(f"SECURITY_EVENT: {json.dumps(security_event)}")
# Alert on critical security events
if severity == ErrorSeverity.CRITICAL:
self._send_security_alert(security_event)
def _send_security_alert(self, event: Dict[str, Any]):
"""Send alerts for critical security events"""
# Implement alerting mechanism (email, Slack, PagerDuty, etc.)
pass
# Audit logging for compliance
class AuditLogger:
"""Tamper-proof audit logging for compliance requirements"""
def __init__(self, app_name: str):
self.app_name = app_name
self.audit_file = f"logs/{app_name}_audit.log"
def log_event(self, event_type: str, user_id: str,
resource: str, action: str,
result: str, details: Dict[str, Any] = None):
"""Log audit event with integrity protection"""
audit_entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_id": str(uuid.uuid4()),
"app_name": self.app_name,
"event_type": event_type,
"user_id": user_id,
"resource": resource,
"action": action,
"result": result,
"details": details or {}
}
# Calculate integrity hash
entry_string = json.dumps(audit_entry, sort_keys=True)
audit_entry["integrity_hash"] = hashlib.sha256(
entry_string.encode()
).hexdigest()
# Append to audit log (append-only)
with open(self.audit_file, 'a') as f:
f.write(json.dumps(audit_entry) + '\n')
# Also send to centralized logging system
self._send_to_siem(audit_entry)
def verify_integrity(self) -> bool:
"""Verify audit log hasn't been tampered with"""
try:
with open(self.audit_file, 'r') as f:
for line in f:
entry = json.loads(line.strip())
stored_hash = entry.pop("integrity_hash")
# Recalculate hash
calculated_hash = hashlib.sha256(
json.dumps(entry, sort_keys=True).encode()
).hexdigest()
if stored_hash != calculated_hash:
return False
return True
except Exception:
return False
// JavaScript - Secure Error Handling and Logging
const crypto = require('crypto');
const fs = require('fs').promises;
const path = require('path');
const { v4: uuidv4 } = require('uuid');
class SecureErrorHandler {
constructor(appName, environment = 'production') {
this.appName = appName;
this.environment = environment;
this.errorMappings = this.initializeErrorMappings();
this.sensitivePatterns = this.initializeSensitivePatterns();
}
initializeErrorMappings() {
return {
'ValidationError': {
message: 'Invalid input provided',
statusCode: 400,
severity: 'low'
},
'UnauthorizedError': {
message: 'Authentication required',
statusCode: 401,
severity: 'medium'
},
'ForbiddenError': {
message: 'Access denied',
statusCode: 403,
severity: 'medium'
},
'NotFoundError': {
message: 'Resource not found',
statusCode: 404,
severity: 'low'
},
'DatabaseError': {
message: 'Database operation failed',
statusCode: 500,
severity: 'high'
},
'NetworkError': {
message: 'Service temporarily unavailable',
statusCode: 503,
severity: 'high'
}
};
}
initializeSensitivePatterns() {
return [
/password\s*[:=]\s*['"']?([^'"']+)['"']?/gi,
/api[_-]?key\s*[:=]\s*['"']?([^'"']+)['"']?/gi,
/secret\s*[:=]\s*['"']?([^'"']+)['"']?/gi,
/token\s*[:=]\s*['"']?([^'"']+)['"']?/gi,
/\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b/g, // Credit card
/\b\d{3}-\d{2}-\d{4}\b/g, // SSN
/[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/g, // Email
/\/home\/[^\/]+\//g, // Unix paths
/C:\\Users\\[^\\]+\\/g, // Windows paths
/(mongodb|postgres|mysql):\/\/[^@]+@/g // Database URLs
];
}
handleError(error, context = {}) {
// Generate unique error ID
const errorId = uuidv4();
// Get error mapping
const errorName = error.name || error.constructor.name;
const errorMapping = this.errorMappings[errorName] || {
message: 'An unexpected error occurred',
statusCode: 500,
severity: 'high'
};
// Build response based on environment
let response;
if (this.environment === 'production') {
response = {
error: errorMapping.message,
errorId,
timestamp: new Date().toISOString()
};
} else {
// Development - include sanitized details
response = {
error: errorMapping.message,
errorId,
errorType: errorName,
timestamp: new Date().toISOString(),
details: this.sanitizeErrorDetails(error.message),
stack: this.sanitizeStackTrace(error.stack)
};
}
// Log full error securely
this.logError(errorId, error, errorMapping.severity, context);
return response;
}
sanitizeErrorDetails(message) {
if (!message) return '';
let sanitized = message;
// Replace sensitive patterns
for (const pattern of this.sensitivePatterns) {
sanitized = sanitized.replace(pattern, '[REDACTED]');
}
// Remove file paths
sanitized = sanitized.replace(/\/[\w\-\/.]+/g, '[PATH]');
sanitized = sanitized.replace(/[A-Z]:\\[\w\-\\.\\]+/g, '[PATH]');
// Remove line numbers
sanitized = sanitized.replace(/line \d+/g, 'line [N]');
sanitized = sanitized.replace(/:\d+:\d+/g, ':[N]:[N]');
return sanitized;
}
sanitizeStackTrace(stack) {
if (!stack || this.environment === 'production') return undefined;
// Remove sensitive file paths but keep structure
let sanitized = stack;
// Replace absolute paths with relative ones
sanitized = sanitized.replace(/\/[\w\-\/.]+\/node_modules/g, './node_modules');
sanitized = sanitized.replace(/\/[\w\-\/.]+\/src/g, './src');
// Remove sensitive data from stack
for (const pattern of this.sensitivePatterns) {
sanitized = sanitized.replace(pattern, '[REDACTED]');
}
return sanitized;
}
logError(errorId, error, severity, context) {
const logEntry = {
errorId,
timestamp: new Date().toISOString(),
appName: this.appName,
environment: this.environment,
severity,
errorType: error.name || error.constructor.name,
errorMessage: this.sanitizeErrorDetails(error.message),
context,
stackTraceHash: this.hashStackTrace(error.stack)
};
// In development, include sanitized stack
if (this.environment !== 'production') {
logEntry.stack = this.sanitizeStackTrace(error.stack);
}
// Log to secure logging system
console.error(JSON.stringify(logEntry));
}
hashStackTrace(stack) {
if (!stack) return null;
return crypto.createHash('sha256')
.update(stack)
.digest('hex')
.substring(0, 16);
}
}
// Secure error handling middleware for Express
function secureErrorMiddleware(appName, environment) {
const errorHandler = new SecureErrorHandler(appName, environment);
return (err, req, res, next) => {
// Extract context from request
const context = {
method: req.method,
path: req.path,
userAgent: req.get('user-agent'),
ip: req.ip,
userId: req.user?.id
};
// Handle error securely
const errorResponse = errorHandler.handleError(err, context);
// Set security headers
res.setHeader('X-Content-Type-Options', 'nosniff');
res.setHeader('X-Frame-Options', 'DENY');
// Send response
res.status(err.statusCode || 500).json(errorResponse);
};
}
class SecureLogger {
constructor(name, config = {}) {
this.name = name;
this.config = {
logLevel: 'info',
maxFileSize: 10 * 1024 * 1024, // 10MB
maxFiles: 5,
...config
};
this.sensitiveFields = new Set([
'password', 'passwd', 'pwd', 'secret', 'token',
'api_key', 'apikey', 'auth', 'authorization',
'cookie', 'session', 'credit_card', 'cc_number',
'cvv', 'ssn', 'social_security', 'tax_id'
]);
}
sanitizeData(data) {
if (!data) return data;
if (typeof data === 'object' && !Array.isArray(data)) {
const sanitized = {};
for (const [key, value] of Object.entries(data)) {
if (this.isSensitiveField(key)) {
sanitized[key] = '[REDACTED]';
} else if (typeof value === 'object') {
sanitized[key] = this.sanitizeData(value);
} else if (typeof value === 'string') {
sanitized[key] = this.sanitizeString(value);
} else {
sanitized[key] = value;
}
}
return sanitized;
} else if (Array.isArray(data)) {
return data.map(item => this.sanitizeData(item));
} else if (typeof data === 'string') {
return this.sanitizeString(data);
}
return data;
}
isSensitiveField(fieldName) {
const lowerField = fieldName.toLowerCase();
return Array.from(this.sensitiveFields).some(sensitive =>
lowerField.includes(sensitive)
);
}
sanitizeString(str) {
let sanitized = str;
// Redact credit card numbers
sanitized = sanitized.replace(
/\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b/g,
'[CREDIT_CARD]'
);
// Redact SSNs
sanitized = sanitized.replace(
/\b\d{3}-\d{2}-\d{4}\b/g,
'[SSN]'
);
// Redact JWTs
sanitized = sanitized.replace(
/Bearer\s+[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+/g,
'Bearer [JWT]'
);
return sanitized;
}
log(level, message, data = {}, meta = {}) {
const logEntry = {
timestamp: new Date().toISOString(),
level,
logger: this.name,
message,
data: this.sanitizeData(data),
...meta
};
// Add request context if available
if (global.requestContext) {
logEntry.requestId = global.requestContext.requestId;
logEntry.userId = global.requestContext.userId;
}
// Log based on level
this.writeLog(logEntry);
}
async writeLog(logEntry) {
// Convert to JSON
const logLine = JSON.stringify(logEntry) + '\n';
// Write to file (with rotation)
const logFile = `logs/${this.name}.log`;
try {
const stats = await fs.stat(logFile);
// Rotate if necessary
if (stats.size > this.config.maxFileSize) {
await this.rotateLog(logFile);
}
} catch (error) {
// File doesn't exist yet
}
// Append to log file
await fs.appendFile(logFile, logLine);
}
async rotateLog(logFile) {
// Rotate log files
for (let i = this.config.maxFiles - 1; i > 0; i--) {
const oldFile = `${logFile}.${i}`;
const newFile = `${logFile}.${i + 1}`;
try {
await fs.rename(oldFile, newFile);
} catch (error) {
// File doesn't exist
}
}
// Rename current file
await fs.rename(logFile, `${logFile}.1`);
}
// Convenience methods
debug(message, data, meta) {
this.log('debug', message, data, meta);
}
info(message, data, meta) {
this.log('info', message, data, meta);
}
warn(message, data, meta) {
this.log('warn', message, data, meta);
}
error(message, data, meta) {
this.log('error', message, data, meta);
}
security(event, details, severity = 'medium') {
this.log('security', event, details, { severity });
// Alert on critical security events
if (severity === 'critical') {
this.sendSecurityAlert({ event, details, severity });
}
}
async sendSecurityAlert(alert) {
// Implement alerting (email, Slack, etc.)
console.error('SECURITY ALERT:', alert);
}
}
// Audit logger for compliance
class AuditLogger {
constructor(appName) {
this.appName = appName;
this.auditFile = `logs/${appName}_audit.log`;
}
async logAuditEvent(event) {
const auditEntry = {
timestamp: new Date().toISOString(),
eventId: uuidv4(),
appName: this.appName,
...event
};
// Calculate integrity hash
const entryString = JSON.stringify(auditEntry, null, 0);
auditEntry.integrityHash = crypto
.createHash('sha256')
.update(entryString)
.digest('hex');
// Append to audit log
await fs.appendFile(
this.auditFile,
JSON.stringify(auditEntry) + '\n'
);
// Send to SIEM
await this.sendToSIEM(auditEntry);
}
async verifyIntegrity() {
try {
const content = await fs.readFile(this.auditFile, 'utf8');
const lines = content.trim().split('\n');
for (const line of lines) {
const entry = JSON.parse(line);
const storedHash = entry.integrityHash;
delete entry.integrityHash;
const calculatedHash = crypto
.createHash('sha256')
.update(JSON.stringify(entry, null, 0))
.digest('hex');
if (storedHash !== calculatedHash) {
return false;
}
}
return true;
} catch (error) {
return false;
}
}
async sendToSIEM(entry) {
// Implement SIEM integration
// Example: send to Elasticsearch, Splunk, etc.
}
}
// Express request logging middleware
function requestLoggingMiddleware(logger) {
return (req, res, next) => {
const requestId = uuidv4();
const startTime = Date.now();
// Store in request context
req.requestId = requestId;
// Log request
logger.info('Incoming request', {
requestId,
method: req.method,
path: req.path,
query: req.query,
headers: {
'user-agent': req.get('user-agent'),
'content-type': req.get('content-type')
}
});
// Capture response
const originalSend = res.send;
res.send = function(data) {
res.send = originalSend;
// Log response
logger.info('Outgoing response', {
requestId,
statusCode: res.statusCode,
duration: Date.now() - startTime,
contentLength: res.get('content-length')
});
return res.send(data);
};
next();
};
}
Implementing secure error handling and logging is essential for maintaining application security while providing the visibility needed for debugging and incident response. By sanitizing error messages, protecting sensitive data in logs, and implementing proper audit trails, applications can handle errors gracefully without exposing vulnerabilities to attackers.## Security Testing and Code Analysis
Security testing and code analysis are essential practices that help identify vulnerabilities before they reach production. Modern development workflows must integrate security testing at every stage, from development through deployment. This chapter explores comprehensive strategies for implementing security testing and code analysis in Python and JavaScript applications, covering static analysis, dynamic testing, dependency scanning, and security-focused unit testing.