Lack of Resources and Rate Limiting

Lack of Resources and Rate Limiting

APIs without proper rate limiting are vulnerable to denial-of-service attacks, brute force attempts, and resource exhaustion. This vulnerability affects both infrastructure costs and service availability. Sophisticated attackers can exploit missing rate limits to enumerate data, perform credential stuffing, or simply overwhelm services.

# Python comprehensive rate limiting implementation
from typing import Dict, Optional, Tuple
import time
import asyncio
from dataclasses import dataclass
from enum import Enum
import redis
import jwt

class RateLimitType(Enum):
    GLOBAL = "global"
    USER = "user"
    IP = "ip"
    ENDPOINT = "endpoint"
    COMPOSITE = "composite"

@dataclass
class RateLimitConfig:
    requests: int
    window: int  # seconds
    burst: Optional[int] = None
    penalty_duration: Optional[int] = None

class ComprehensiveRateLimiter:
    def __init__(self, redis_client, config: Dict[str, RateLimitConfig]):
        self.redis = redis_client
        self.config = config
        
    async def check_rate_limit(self, 
                              request_context: Dict) -> Tuple[bool, Dict]:
        """Check multiple rate limit layers"""
        results = {}
        
        # Global rate limit
        global_check = await self._check_limit(
            RateLimitType.GLOBAL,
            "global",
            self.config.get('global')
        )
        results['global'] = global_check
        
        # IP-based rate limit
        ip_check = await self._check_limit(
            RateLimitType.IP,
            request_context['ip'],
            self.config.get('ip')
        )
        results['ip'] = ip_check
        
        # User-based rate limit (if authenticated)
        if request_context.get('user_id'):
            user_config = self._get_user_rate_limit(request_context['user'])
            user_check = await self._check_limit(
                RateLimitType.USER,
                request_context['user_id'],
                user_config
            )
            results['user'] = user_check
        
        # Endpoint-specific rate limit
        endpoint_key = f"{request_context['method']}:{request_context['path']}"
        endpoint_config = self.config.get('endpoints', {}).get(endpoint_key)
        if endpoint_config:
            endpoint_check = await self._check_limit(
                RateLimitType.ENDPOINT,
                f"{request_context.get('user_id', request_context['ip'])}:{endpoint_key}",
                endpoint_config
            )
            results['endpoint'] = endpoint_check
        
        # Composite rate limiting (e.g., writes per user per hour)
        composite_checks = await self._check_composite_limits(request_context)
        results['composite'] = composite_checks
        
        # Determine if request should be allowed
        allowed = all(check['allowed'] for check in results.values() 
                     if isinstance(check, dict))
        
        # Calculate retry-after
        retry_after = self._calculate_retry_after(results)
        
        return allowed, {
            'allowed': allowed,
            'limits': results,
            'retry_after': retry_after
        }
    
    async def _check_limit(self, 
                          limit_type: RateLimitType,
                          key: str,
                          config: Optional[RateLimitConfig]) -> Dict:
        if not config:
            return {'allowed': True}
            
        full_key = f"rate_limit:{limit_type.value}:{key}"
        
        # Sliding window implementation
        now = time.time()
        window_start = now - config.window
        
        async with self.redis.pipeline() as pipe:
            # Remove old entries
            pipe.zremrangebyscore(full_key, 0, window_start)
            
            # Count current window
            pipe.zcard(full_key)
            
            # Add current request
            pipe.zadd(full_key, {f"{now}:{id(now)}": now})
            
            # Set expiration
            pipe.expire(full_key, config.window + 60)
            
            results = await pipe.execute()
            
        current_count = results[1]
        
        # Check burst limit
        if config.burst and current_count > config.burst:
            # Apply penalty
            if config.penalty_duration:
                await self._apply_penalty(key, config.penalty_duration)
            
        allowed = current_count <= config.requests
        
        return {
            'allowed': allowed,
            'limit': config.requests,
            'remaining': max(0, config.requests - current_count),
            'reset': int(now + config.window),
            'current': current_count
        }
    
    def _get_user_rate_limit(self, user: Dict) -> RateLimitConfig:
        """Dynamic rate limits based on user tier"""
        tier_configs = {
            'free': RateLimitConfig(requests=100, window=3600),
            'basic': RateLimitConfig(requests=1000, window=3600),
            'premium': RateLimitConfig(requests=10000, window=3600),
            'enterprise': RateLimitConfig(requests=100000, window=3600)
        }
        
        return tier_configs.get(user.get('tier', 'free'))
    
    async def _check_composite_limits(self, context: Dict) -> Dict:
        """Check complex multi-factor rate limits"""
        checks = {}
        
        # Write operations per user per day
        if context['method'] in ['POST', 'PUT', 'DELETE']:
            write_key = f"writes:{context.get('user_id', context['ip'])}:daily"
            write_check = await self._check_limit(
                RateLimitType.COMPOSITE,
                write_key,
                RateLimitConfig(requests=1000, window=86400)
            )
            checks['daily_writes'] = write_check
        
        # Expensive operations
        if context['path'].startswith('/api/reports/'):
            expensive_key = f"expensive:{context.get('user_id', context['ip'])}"
            expensive_check = await self._check_limit(
                RateLimitType.COMPOSITE,
                expensive_key,
                RateLimitConfig(requests=10, window=3600, burst=3)
            )
            checks['expensive_operations'] = expensive_check
        
        return checks

# Middleware implementation
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()
rate_limiter = ComprehensiveRateLimiter(redis_client, rate_limit_config)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # Extract request context
    context = {
        'ip': request.client.host,
        'method': request.method,
        'path': request.url.path,
        'user_id': None,
        'user': None
    }
    
    # Extract user from JWT if present
    auth_header = request.headers.get('Authorization', '')
    if auth_header.startswith('Bearer '):
        try:
            token = auth_header.split(' ')[1]
            payload = jwt.decode(token, PUBLIC_KEY, algorithms=['RS256'])
            context['user_id'] = payload['sub']
            context['user'] = await get_user(payload['sub'])
        except:
            pass
    
    # Check rate limits
    allowed, limit_info = await rate_limiter.check_rate_limit(context)
    
    if not allowed:
        return JSONResponse(
            status_code=429,
            content={
                'error': 'Too Many Requests',
                'message': 'Rate limit exceeded',
                'retry_after': limit_info['retry_after']
            },
            headers={
                'Retry-After': str(limit_info['retry_after']),
                'X-RateLimit-Limit': str(limit_info['limits']['user']['limit']),
                'X-RateLimit-Remaining': str(limit_info['limits']['user']['remaining']),
                'X-RateLimit-Reset': str(limit_info['limits']['user']['reset'])
            }
        )
    
    # Add rate limit headers to response
    response = await call_next(request)
    
    # Add rate limit headers
    for limit_type, limit_data in limit_info['limits'].items():
        if isinstance(limit_data, dict) and 'limit' in limit_data:
            response.headers[f'X-RateLimit-{limit_type}-Limit'] = str(limit_data['limit'])
            response.headers[f'X-RateLimit-{limit_type}-Remaining'] = str(limit_data['remaining'])
    
    return response

Understanding and fixing these vulnerabilities requires continuous vigilance and regular security assessments. The next chapter explores API security compliance requirements and industry standards.## API Security Compliance and Standards

Regulatory compliance and industry standards provide frameworks for implementing comprehensive API security. Organizations must navigate complex requirements from regulations like GDPR, HIPAA, and PCI DSS while adhering to security standards such as OAuth 2.0, OpenID Connect, and OWASP guidelines. This chapter examines key compliance requirements, implementation strategies, and best practices for maintaining compliant API security programs.