Caching Strategies
Caching Strategies
While password hashes should never be cached in their entirety (defeating the purpose of hashing), strategic caching of intermediate results can improve performance. Authentication tokens, session data, and negative cache entries (tracking recent failed attempts) reduce the frequency of expensive password verification operations.
Token-based authentication reduces password hashing frequency. After successful authentication, issue time-limited tokens that subsequent requests can present. This amortizes the cost of password hashing across many requests. Implement token revocation carefully to maintain security while benefiting from performance improvements. Consider short-lived access tokens with longer-lived refresh tokens for optimal balance.
import time
import hashlib
import hmac
import json
from datetime import datetime, timedelta
import redis
from functools import wraps
class AuthenticationCache:
"""Intelligent caching for authentication operations"""
def __init__(self, redis_client: redis.Redis, token_secret: bytes):
self.redis = redis_client
self.token_secret = token_secret
self.stats = {
'cache_hits': 0,
'cache_misses': 0,
'token_validations': 0,
'password_verifications': 0
}
def create_auth_token(self, user_id: str, duration_minutes: int = 15) -> str:
"""Create short-lived authentication token"""
payload = {
'user_id': user_id,
'created': datetime.utcnow().timestamp(),
'expires': (datetime.utcnow() + timedelta(minutes=duration_minutes)).timestamp()
}
# Create signed token
payload_json = json.dumps(payload, sort_keys=True)
signature = hmac.new(
self.token_secret,
payload_json.encode(),
hashlib.sha256
).hexdigest()
token = f"{payload_json}:{signature}"
# Cache token for quick validation
cache_key = f"auth_token:{user_id}:{signature[:8]}"
self.redis.setex(
cache_key,
duration_minutes * 60,
token
)
return token
def validate_token(self, token: str) -> Optional[str]:
"""Validate token without password verification"""
self.stats['token_validations'] += 1
try:
payload_json, signature = token.rsplit(':', 1)
payload = json.loads(payload_json)
# Verify signature
expected_signature = hmac.new(
self.token_secret,
payload_json.encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
return None
# Check expiration
if payload['expires'] < datetime.utcnow().timestamp():
return None
# Check cache for revocation
cache_key = f"auth_token:{payload['user_id']}:{signature[:8]}"
if not self.redis.exists(cache_key):
return None # Token was revoked
self.stats['cache_hits'] += 1
return payload['user_id']
except:
return None
def implement_negative_cache(self, username: str, duration_seconds: int = 300):
"""Cache failed authentication attempts"""
key = f"auth_failed:{username}"
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, duration_seconds)
return current
def check_negative_cache(self, username: str) -> Optional[int]:
"""Check if user has recent failures"""
key = f"auth_failed:{username}"
failures = self.redis.get(key)
if failures:
self.stats['cache_hits'] += 1
return int(failures)
self.stats['cache_misses'] += 1
return None
def cache_bcrypt_cost(self, password_hash: str, cost: int):
"""Cache bcrypt cost factor for performance optimization"""
# Extract salt from bcrypt hash
if password_hash.startswith('$2'):
salt = password_hash[:29]
key = f"bcrypt_cost:{salt}"
self.redis.setex(key, 86400, str(cost)) # Cache for 24 hours
def get_bcrypt_cost(self, password_hash: str) -> Optional[int]:
"""Retrieve cached bcrypt cost"""
if password_hash.startswith('$2'):
salt = password_hash[:29]
key = f"bcrypt_cost:{salt}"
cost = self.redis.get(key)
if cost:
self.stats['cache_hits'] += 1
return int(cost)
self.stats['cache_misses'] += 1
return None
def implement_request_coalescing(self):
"""Coalesce multiple authentication requests for same user"""
pending_auth = {} # user -> Future
def coalesced_authenticate(username: str, password: str):
"""Authenticate with request coalescing"""
if username in pending_auth:
# Wait for in-progress authentication
return pending_auth[username].result()
# Create future for this authentication
import concurrent.futures
future = concurrent.futures.Future()
pending_auth[username] = future
try:
# Perform actual authentication
result = self._do_authenticate(username, password)
future.set_result(result)
return result
finally:
# Clean up
del pending_auth[username]
return coalesced_authenticate
def get_cache_statistics(self) -> Dict:
"""Get cache performance statistics"""
total_operations = (self.stats['cache_hits'] +
self.stats['cache_misses'])
hit_rate = (self.stats['cache_hits'] / total_operations * 100
if total_operations > 0 else 0)
return {
'hit_rate': hit_rate,
'total_operations': total_operations,
'cache_hits': self.stats['cache_hits'],
'cache_misses': self.stats['cache_misses'],
'token_validations': self.stats['token_validations'],
'password_verifications': self.stats['password_verifications'],
'cache_effectiveness': (
self.stats['token_validations'] /
(self.stats['token_validations'] + self.stats['password_verifications'])
* 100 if self.stats['token_validations'] +
self.stats['password_verifications'] > 0 else 0
)
}
class DistributedRateLimiter:
"""Distributed rate limiting for authentication"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def check_rate_limit_sliding_window(self, key: str,
limit: int,
window_seconds: int) -> bool:
"""Sliding window rate limiting using Redis sorted sets"""
now = time.time()
window_start = now - window_seconds
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count entries in window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, window_seconds + 1)
results = pipe.execute()
count = results[1]
return count < limit