Designing a Comprehensive Logging Strategy
Designing a Comprehensive Logging Strategy
API security logging requires capturing the right data at the right level of detail. Too little logging leaves security blind spots, while excessive logging creates noise that obscures important events and consumes excessive resources. The key lies in identifying what security-relevant events to log and structuring that data for effective analysis.
Security-relevant events extend beyond simple access logs. Authentication attempts, authorization decisions, input validation failures, rate limit violations, and unusual behavior patterns all provide valuable security intelligence. Each event type requires specific data points to enable meaningful analysis and incident investigation.
# Python comprehensive API security logging framework
import logging
import json
import time
import hashlib
import traceback
from datetime import datetime
from contextlib import contextmanager
from typing import Dict, Any, Optional
import structlog
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
class SecurityLogger:
def __init__(self, service_name: str):
self.logger = structlog.get_logger(service_name)
self.service_name = service_name
def log_authentication_attempt(self,
username: str,
auth_method: str,
success: bool,
ip_address: str,
user_agent: str,
failure_reason: Optional[str] = None,
additional_context: Optional[Dict] = None):
"""Log authentication attempts with security context"""
event_data = {
'event_type': 'authentication',
'username': self._hash_sensitive(username),
'auth_method': auth_method,
'success': success,
'ip_address': ip_address,
'user_agent': user_agent,
'timestamp': datetime.utcnow().isoformat(),
}
if not success and failure_reason:
event_data['failure_reason'] = failure_reason
if additional_context:
event_data['context'] = additional_context
# Add geographic information
event_data['geo_location'] = self._get_geo_location(ip_address)
# Risk scoring
event_data['risk_score'] = self._calculate_risk_score(event_data)
if success:
self.logger.info("authentication_success", **event_data)
else:
self.logger.warning("authentication_failure", **event_data)
# Alert on suspicious patterns
if self._is_suspicious_auth_failure(event_data):
self.logger.error("suspicious_authentication_pattern", **event_data)
def log_authorization_check(self,
user_id: str,
resource: str,
action: str,
granted: bool,
required_permissions: List[str],
user_permissions: List[str],
request_context: Dict):
"""Log authorization decisions with full context"""
event_data = {
'event_type': 'authorization',
'user_id': user_id,
'resource': resource,
'action': action,
'granted': granted,
'required_permissions': required_permissions,
'user_permissions': user_permissions,
'request_id': request_context.get('request_id'),
'api_endpoint': request_context.get('endpoint'),
'http_method': request_context.get('method'),
}
if not granted:
event_data['missing_permissions'] = list(
set(required_permissions) - set(user_permissions)
)
# Check for privilege escalation attempts
if self._is_privilege_escalation_attempt(event_data):
self.logger.error("privilege_escalation_attempt", **event_data)
else:
self.logger.warning("authorization_denied", **event_data)
else:
self.logger.info("authorization_granted", **event_data)
def log_api_request(self, request_data: Dict):
"""Log API request with security-relevant details"""
# Extract and hash sensitive data
event_data = {
'event_type': 'api_request',
'request_id': request_data.get('request_id'),
'method': request_data.get('method'),
'path': request_data.get('path'),
'query_params': self._sanitize_params(request_data.get('query_params', {})),
'user_id': request_data.get('user_id'),
'ip_address': request_data.get('ip_address'),
'user_agent': request_data.get('user_agent'),
'api_version': request_data.get('api_version'),
'request_size': request_data.get('content_length', 0),
}
# Add security headers info
security_headers = self._extract_security_headers(request_data.get('headers', {}))
if security_headers:
event_data['security_headers'] = security_headers
self.logger.info("api_request", **event_data)
def log_api_response(self, response_data: Dict):
"""Log API response with security considerations"""
event_data = {
'event_type': 'api_response',
'request_id': response_data.get('request_id'),
'status_code': response_data.get('status_code'),
'response_time_ms': response_data.get('response_time_ms'),
'response_size': response_data.get('response_size'),
}
# Log errors with more detail
if response_data.get('status_code', 0) >= 400:
event_data['error_details'] = response_data.get('error_details')
if response_data.get('status_code') == 500:
# Log sanitized stack trace for 500 errors
event_data['stack_trace'] = self._sanitize_stack_trace(
response_data.get('stack_trace')
)
self.logger.info("api_response", **event_data)
def log_security_event(self,
event_type: str,
severity: str,
details: Dict,
user_id: Optional[str] = None,
ip_address: Optional[str] = None):
"""Log security-specific events"""
event_data = {
'event_type': f'security_{event_type}',
'severity': severity,
'details': details,
'timestamp': datetime.utcnow().isoformat(),
}
if user_id:
event_data['user_id'] = user_id
if ip_address:
event_data['ip_address'] = ip_address
# Map severity to log level
log_method = getattr(self.logger, severity.lower(), self.logger.info)
log_method(f"security_event_{event_type}", **event_data)
# Trigger alerts for high severity events
if severity in ['CRITICAL', 'HIGH']:
self._trigger_security_alert(event_data)
def log_rate_limit_violation(self,
user_id: str,
ip_address: str,
endpoint: str,
limit_type: str,
limit_value: int,
current_value: int):
"""Log rate limiting events"""
event_data = {
'event_type': 'rate_limit_violation',
'user_id': user_id,
'ip_address': ip_address,
'endpoint': endpoint,
'limit_type': limit_type,
'limit_value': limit_value,
'current_value': current_value,
'excess': current_value - limit_value,
}
self.logger.warning("rate_limit_exceeded", **event_data)
# Check for DDoS patterns
if self._is_potential_ddos(event_data):
self.logger.error("potential_ddos_attack", **event_data)
def _hash_sensitive(self, value: str) -> str:
"""Hash sensitive values for logging"""
return hashlib.sha256(f"{value}{self.service_name}".encode()).hexdigest()[:16]
def _sanitize_params(self, params: Dict) -> Dict:
"""Remove sensitive parameters from logs"""
sensitive_keys = {'password', 'token', 'api_key', 'secret', 'creditcard'}
sanitized = {}
for key, value in params.items():
if any(sensitive in key.lower() for sensitive in sensitive_keys):
sanitized[key] = '[REDACTED]'
else:
sanitized[key] = value
return sanitized
# Context manager for request logging
@contextmanager
def log_api_request_context(logger: SecurityLogger, request):
"""Context manager for complete request/response logging"""
request_id = request.headers.get('X-Request-ID', generate_request_id())
start_time = time.time()
# Log request
request_data = {
'request_id': request_id,
'method': request.method,
'path': request.path,
'query_params': dict(request.args),
'user_id': getattr(request, 'user_id', None),
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent'),
'content_length': request.content_length,
'headers': dict(request.headers),
}
logger.log_api_request(request_data)
try:
yield request_id
except Exception as e:
# Log exceptions
logger.log_security_event(
'unhandled_exception',
'HIGH',
{
'exception_type': type(e).__name__,
'exception_message': str(e),
'traceback': traceback.format_exc(),
},
user_id=getattr(request, 'user_id', None),
ip_address=request.remote_addr
)
raise
finally:
# Log response
response_time_ms = (time.time() - start_time) * 1000
response_data = {
'request_id': request_id,
'response_time_ms': response_time_ms,
}
logger.log_api_response(response_data)