Lack of Resources and Rate Limiting
Lack of Resources and Rate Limiting
APIs without proper rate limiting are vulnerable to denial-of-service attacks, brute force attempts, and resource exhaustion. This vulnerability affects both infrastructure costs and service availability. Sophisticated attackers can exploit missing rate limits to enumerate data, perform credential stuffing, or simply overwhelm services.
# Python comprehensive rate limiting implementation
from typing import Dict, Optional, Tuple
import time
import asyncio
from dataclasses import dataclass
from enum import Enum
import redis
import jwt
class RateLimitType(Enum):
GLOBAL = "global"
USER = "user"
IP = "ip"
ENDPOINT = "endpoint"
COMPOSITE = "composite"
@dataclass
class RateLimitConfig:
requests: int
window: int # seconds
burst: Optional[int] = None
penalty_duration: Optional[int] = None
class ComprehensiveRateLimiter:
def __init__(self, redis_client, config: Dict[str, RateLimitConfig]):
self.redis = redis_client
self.config = config
async def check_rate_limit(self,
request_context: Dict) -> Tuple[bool, Dict]:
"""Check multiple rate limit layers"""
results = {}
# Global rate limit
global_check = await self._check_limit(
RateLimitType.GLOBAL,
"global",
self.config.get('global')
)
results['global'] = global_check
# IP-based rate limit
ip_check = await self._check_limit(
RateLimitType.IP,
request_context['ip'],
self.config.get('ip')
)
results['ip'] = ip_check
# User-based rate limit (if authenticated)
if request_context.get('user_id'):
user_config = self._get_user_rate_limit(request_context['user'])
user_check = await self._check_limit(
RateLimitType.USER,
request_context['user_id'],
user_config
)
results['user'] = user_check
# Endpoint-specific rate limit
endpoint_key = f"{request_context['method']}:{request_context['path']}"
endpoint_config = self.config.get('endpoints', {}).get(endpoint_key)
if endpoint_config:
endpoint_check = await self._check_limit(
RateLimitType.ENDPOINT,
f"{request_context.get('user_id', request_context['ip'])}:{endpoint_key}",
endpoint_config
)
results['endpoint'] = endpoint_check
# Composite rate limiting (e.g., writes per user per hour)
composite_checks = await self._check_composite_limits(request_context)
results['composite'] = composite_checks
# Determine if request should be allowed
allowed = all(check['allowed'] for check in results.values()
if isinstance(check, dict))
# Calculate retry-after
retry_after = self._calculate_retry_after(results)
return allowed, {
'allowed': allowed,
'limits': results,
'retry_after': retry_after
}
async def _check_limit(self,
limit_type: RateLimitType,
key: str,
config: Optional[RateLimitConfig]) -> Dict:
if not config:
return {'allowed': True}
full_key = f"rate_limit:{limit_type.value}:{key}"
# Sliding window implementation
now = time.time()
window_start = now - config.window
async with self.redis.pipeline() as pipe:
# Remove old entries
pipe.zremrangebyscore(full_key, 0, window_start)
# Count current window
pipe.zcard(full_key)
# Add current request
pipe.zadd(full_key, {f"{now}:{id(now)}": now})
# Set expiration
pipe.expire(full_key, config.window + 60)
results = await pipe.execute()
current_count = results[1]
# Check burst limit
if config.burst and current_count > config.burst:
# Apply penalty
if config.penalty_duration:
await self._apply_penalty(key, config.penalty_duration)
allowed = current_count <= config.requests
return {
'allowed': allowed,
'limit': config.requests,
'remaining': max(0, config.requests - current_count),
'reset': int(now + config.window),
'current': current_count
}
def _get_user_rate_limit(self, user: Dict) -> RateLimitConfig:
"""Dynamic rate limits based on user tier"""
tier_configs = {
'free': RateLimitConfig(requests=100, window=3600),
'basic': RateLimitConfig(requests=1000, window=3600),
'premium': RateLimitConfig(requests=10000, window=3600),
'enterprise': RateLimitConfig(requests=100000, window=3600)
}
return tier_configs.get(user.get('tier', 'free'))
async def _check_composite_limits(self, context: Dict) -> Dict:
"""Check complex multi-factor rate limits"""
checks = {}
# Write operations per user per day
if context['method'] in ['POST', 'PUT', 'DELETE']:
write_key = f"writes:{context.get('user_id', context['ip'])}:daily"
write_check = await self._check_limit(
RateLimitType.COMPOSITE,
write_key,
RateLimitConfig(requests=1000, window=86400)
)
checks['daily_writes'] = write_check
# Expensive operations
if context['path'].startswith('/api/reports/'):
expensive_key = f"expensive:{context.get('user_id', context['ip'])}"
expensive_check = await self._check_limit(
RateLimitType.COMPOSITE,
expensive_key,
RateLimitConfig(requests=10, window=3600, burst=3)
)
checks['expensive_operations'] = expensive_check
return checks
# Middleware implementation
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
rate_limiter = ComprehensiveRateLimiter(redis_client, rate_limit_config)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Extract request context
context = {
'ip': request.client.host,
'method': request.method,
'path': request.url.path,
'user_id': None,
'user': None
}
# Extract user from JWT if present
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
try:
token = auth_header.split(' ')[1]
payload = jwt.decode(token, PUBLIC_KEY, algorithms=['RS256'])
context['user_id'] = payload['sub']
context['user'] = await get_user(payload['sub'])
except:
pass
# Check rate limits
allowed, limit_info = await rate_limiter.check_rate_limit(context)
if not allowed:
return JSONResponse(
status_code=429,
content={
'error': 'Too Many Requests',
'message': 'Rate limit exceeded',
'retry_after': limit_info['retry_after']
},
headers={
'Retry-After': str(limit_info['retry_after']),
'X-RateLimit-Limit': str(limit_info['limits']['user']['limit']),
'X-RateLimit-Remaining': str(limit_info['limits']['user']['remaining']),
'X-RateLimit-Reset': str(limit_info['limits']['user']['reset'])
}
)
# Add rate limit headers to response
response = await call_next(request)
# Add rate limit headers
for limit_type, limit_data in limit_info['limits'].items():
if isinstance(limit_data, dict) and 'limit' in limit_data:
response.headers[f'X-RateLimit-{limit_type}-Limit'] = str(limit_data['limit'])
response.headers[f'X-RateLimit-{limit_type}-Remaining'] = str(limit_data['remaining'])
return response
Understanding and fixing these vulnerabilities requires continuous vigilance and regular security assessments. The next chapter explores API security compliance requirements and industry standards.## API Security Compliance and Standards
Regulatory compliance and industry standards provide frameworks for implementing comprehensive API security. Organizations must navigate complex requirements from regulations like GDPR, HIPAA, and PCI DSS while adhering to security standards such as OAuth 2.0, OpenID Connect, and OWASP guidelines. This chapter examines key compliance requirements, implementation strategies, and best practices for maintaining compliant API security programs.