Designing a Comprehensive Logging Strategy

Designing a Comprehensive Logging Strategy

API security logging requires capturing the right data at the right level of detail. Too little logging leaves security blind spots, while excessive logging creates noise that obscures important events and consumes excessive resources. The key lies in identifying what security-relevant events to log and structuring that data for effective analysis.

Security-relevant events extend beyond simple access logs. Authentication attempts, authorization decisions, input validation failures, rate limit violations, and unusual behavior patterns all provide valuable security intelligence. Each event type requires specific data points to enable meaningful analysis and incident investigation.

# Python comprehensive API security logging framework
import logging
import json
import time
import hashlib
import traceback
from datetime import datetime
from contextlib import contextmanager
from typing import Dict, Any, Optional
import structlog

# Configure structured logging
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

class SecurityLogger:
    def __init__(self, service_name: str):
        self.logger = structlog.get_logger(service_name)
        self.service_name = service_name
        
    def log_authentication_attempt(self, 
                                 username: str,
                                 auth_method: str,
                                 success: bool,
                                 ip_address: str,
                                 user_agent: str,
                                 failure_reason: Optional[str] = None,
                                 additional_context: Optional[Dict] = None):
        """Log authentication attempts with security context"""
        event_data = {
            'event_type': 'authentication',
            'username': self._hash_sensitive(username),
            'auth_method': auth_method,
            'success': success,
            'ip_address': ip_address,
            'user_agent': user_agent,
            'timestamp': datetime.utcnow().isoformat(),
        }
        
        if not success and failure_reason:
            event_data['failure_reason'] = failure_reason
            
        if additional_context:
            event_data['context'] = additional_context
            
        # Add geographic information
        event_data['geo_location'] = self._get_geo_location(ip_address)
        
        # Risk scoring
        event_data['risk_score'] = self._calculate_risk_score(event_data)
        
        if success:
            self.logger.info("authentication_success", **event_data)
        else:
            self.logger.warning("authentication_failure", **event_data)
            
            # Alert on suspicious patterns
            if self._is_suspicious_auth_failure(event_data):
                self.logger.error("suspicious_authentication_pattern", **event_data)
    
    def log_authorization_check(self,
                              user_id: str,
                              resource: str,
                              action: str,
                              granted: bool,
                              required_permissions: List[str],
                              user_permissions: List[str],
                              request_context: Dict):
        """Log authorization decisions with full context"""
        event_data = {
            'event_type': 'authorization',
            'user_id': user_id,
            'resource': resource,
            'action': action,
            'granted': granted,
            'required_permissions': required_permissions,
            'user_permissions': user_permissions,
            'request_id': request_context.get('request_id'),
            'api_endpoint': request_context.get('endpoint'),
            'http_method': request_context.get('method'),
        }
        
        if not granted:
            event_data['missing_permissions'] = list(
                set(required_permissions) - set(user_permissions)
            )
            
            # Check for privilege escalation attempts
            if self._is_privilege_escalation_attempt(event_data):
                self.logger.error("privilege_escalation_attempt", **event_data)
            else:
                self.logger.warning("authorization_denied", **event_data)
        else:
            self.logger.info("authorization_granted", **event_data)
    
    def log_api_request(self, request_data: Dict):
        """Log API request with security-relevant details"""
        # Extract and hash sensitive data
        event_data = {
            'event_type': 'api_request',
            'request_id': request_data.get('request_id'),
            'method': request_data.get('method'),
            'path': request_data.get('path'),
            'query_params': self._sanitize_params(request_data.get('query_params', {})),
            'user_id': request_data.get('user_id'),
            'ip_address': request_data.get('ip_address'),
            'user_agent': request_data.get('user_agent'),
            'api_version': request_data.get('api_version'),
            'request_size': request_data.get('content_length', 0),
        }
        
        # Add security headers info
        security_headers = self._extract_security_headers(request_data.get('headers', {}))
        if security_headers:
            event_data['security_headers'] = security_headers
            
        self.logger.info("api_request", **event_data)
    
    def log_api_response(self, response_data: Dict):
        """Log API response with security considerations"""
        event_data = {
            'event_type': 'api_response',
            'request_id': response_data.get('request_id'),
            'status_code': response_data.get('status_code'),
            'response_time_ms': response_data.get('response_time_ms'),
            'response_size': response_data.get('response_size'),
        }
        
        # Log errors with more detail
        if response_data.get('status_code', 0) >= 400:
            event_data['error_details'] = response_data.get('error_details')
            
            if response_data.get('status_code') == 500:
                # Log sanitized stack trace for 500 errors
                event_data['stack_trace'] = self._sanitize_stack_trace(
                    response_data.get('stack_trace')
                )
                
        self.logger.info("api_response", **event_data)
    
    def log_security_event(self, 
                         event_type: str,
                         severity: str,
                         details: Dict,
                         user_id: Optional[str] = None,
                         ip_address: Optional[str] = None):
        """Log security-specific events"""
        event_data = {
            'event_type': f'security_{event_type}',
            'severity': severity,
            'details': details,
            'timestamp': datetime.utcnow().isoformat(),
        }
        
        if user_id:
            event_data['user_id'] = user_id
        if ip_address:
            event_data['ip_address'] = ip_address
            
        # Map severity to log level
        log_method = getattr(self.logger, severity.lower(), self.logger.info)
        log_method(f"security_event_{event_type}", **event_data)
        
        # Trigger alerts for high severity events
        if severity in ['CRITICAL', 'HIGH']:
            self._trigger_security_alert(event_data)
    
    def log_rate_limit_violation(self,
                                user_id: str,
                                ip_address: str,
                                endpoint: str,
                                limit_type: str,
                                limit_value: int,
                                current_value: int):
        """Log rate limiting events"""
        event_data = {
            'event_type': 'rate_limit_violation',
            'user_id': user_id,
            'ip_address': ip_address,
            'endpoint': endpoint,
            'limit_type': limit_type,
            'limit_value': limit_value,
            'current_value': current_value,
            'excess': current_value - limit_value,
        }
        
        self.logger.warning("rate_limit_exceeded", **event_data)
        
        # Check for DDoS patterns
        if self._is_potential_ddos(event_data):
            self.logger.error("potential_ddos_attack", **event_data)
    
    def _hash_sensitive(self, value: str) -> str:
        """Hash sensitive values for logging"""
        return hashlib.sha256(f"{value}{self.service_name}".encode()).hexdigest()[:16]
    
    def _sanitize_params(self, params: Dict) -> Dict:
        """Remove sensitive parameters from logs"""
        sensitive_keys = {'password', 'token', 'api_key', 'secret', 'creditcard'}
        sanitized = {}
        
        for key, value in params.items():
            if any(sensitive in key.lower() for sensitive in sensitive_keys):
                sanitized[key] = '[REDACTED]'
            else:
                sanitized[key] = value
                
        return sanitized

# Context manager for request logging
@contextmanager
def log_api_request_context(logger: SecurityLogger, request):
    """Context manager for complete request/response logging"""
    request_id = request.headers.get('X-Request-ID', generate_request_id())
    start_time = time.time()
    
    # Log request
    request_data = {
        'request_id': request_id,
        'method': request.method,
        'path': request.path,
        'query_params': dict(request.args),
        'user_id': getattr(request, 'user_id', None),
        'ip_address': request.remote_addr,
        'user_agent': request.headers.get('User-Agent'),
        'content_length': request.content_length,
        'headers': dict(request.headers),
    }
    
    logger.log_api_request(request_data)
    
    try:
        yield request_id
        
    except Exception as e:
        # Log exceptions
        logger.log_security_event(
            'unhandled_exception',
            'HIGH',
            {
                'exception_type': type(e).__name__,
                'exception_message': str(e),
                'traceback': traceback.format_exc(),
            },
            user_id=getattr(request, 'user_id', None),
            ip_address=request.remote_addr
        )
        raise
        
    finally:
        # Log response
        response_time_ms = (time.time() - start_time) * 1000
        
        response_data = {
            'request_id': request_id,
            'response_time_ms': response_time_ms,
        }
        
        logger.log_api_response(response_data)