Caching Strategies

Caching Strategies

While password hashes should never be cached in their entirety (defeating the purpose of hashing), strategic caching of intermediate results can improve performance. Authentication tokens, session data, and negative cache entries (tracking recent failed attempts) reduce the frequency of expensive password verification operations.

Token-based authentication reduces password hashing frequency. After successful authentication, issue time-limited tokens that subsequent requests can present. This amortizes the cost of password hashing across many requests. Implement token revocation carefully to maintain security while benefiting from performance improvements. Consider short-lived access tokens with longer-lived refresh tokens for optimal balance.

import time
import hashlib
import hmac
import json
from datetime import datetime, timedelta
import redis
from functools import wraps

class AuthenticationCache:
    """Intelligent caching for authentication operations"""
    
    def __init__(self, redis_client: redis.Redis, token_secret: bytes):
        self.redis = redis_client
        self.token_secret = token_secret
        self.stats = {
            'cache_hits': 0,
            'cache_misses': 0,
            'token_validations': 0,
            'password_verifications': 0
        }
    
    def create_auth_token(self, user_id: str, duration_minutes: int = 15) -> str:
        """Create short-lived authentication token"""
        
        payload = {
            'user_id': user_id,
            'created': datetime.utcnow().timestamp(),
            'expires': (datetime.utcnow() + timedelta(minutes=duration_minutes)).timestamp()
        }
        
        # Create signed token
        payload_json = json.dumps(payload, sort_keys=True)
        signature = hmac.new(
            self.token_secret,
            payload_json.encode(),
            hashlib.sha256
        ).hexdigest()
        
        token = f"{payload_json}:{signature}"
        
        # Cache token for quick validation
        cache_key = f"auth_token:{user_id}:{signature[:8]}"
        self.redis.setex(
            cache_key,
            duration_minutes * 60,
            token
        )
        
        return token
    
    def validate_token(self, token: str) -> Optional[str]:
        """Validate token without password verification"""
        
        self.stats['token_validations'] += 1
        
        try:
            payload_json, signature = token.rsplit(':', 1)
            payload = json.loads(payload_json)
            
            # Verify signature
            expected_signature = hmac.new(
                self.token_secret,
                payload_json.encode(),
                hashlib.sha256
            ).hexdigest()
            
            if not hmac.compare_digest(signature, expected_signature):
                return None
            
            # Check expiration
            if payload['expires'] < datetime.utcnow().timestamp():
                return None
            
            # Check cache for revocation
            cache_key = f"auth_token:{payload['user_id']}:{signature[:8]}"
            if not self.redis.exists(cache_key):
                return None  # Token was revoked
            
            self.stats['cache_hits'] += 1
            return payload['user_id']
            
        except:
            return None
    
    def implement_negative_cache(self, username: str, duration_seconds: int = 300):
        """Cache failed authentication attempts"""
        
        key = f"auth_failed:{username}"
        current = self.redis.incr(key)
        
        if current == 1:
            self.redis.expire(key, duration_seconds)
        
        return current
    
    def check_negative_cache(self, username: str) -> Optional[int]:
        """Check if user has recent failures"""
        
        key = f"auth_failed:{username}"
        failures = self.redis.get(key)
        
        if failures:
            self.stats['cache_hits'] += 1
            return int(failures)
        
        self.stats['cache_misses'] += 1
        return None
    
    def cache_bcrypt_cost(self, password_hash: str, cost: int):
        """Cache bcrypt cost factor for performance optimization"""
        
        # Extract salt from bcrypt hash
        if password_hash.startswith('$2'):
            salt = password_hash[:29]
            key = f"bcrypt_cost:{salt}"
            self.redis.setex(key, 86400, str(cost))  # Cache for 24 hours
    
    def get_bcrypt_cost(self, password_hash: str) -> Optional[int]:
        """Retrieve cached bcrypt cost"""
        
        if password_hash.startswith('$2'):
            salt = password_hash[:29]
            key = f"bcrypt_cost:{salt}"
            cost = self.redis.get(key)
            
            if cost:
                self.stats['cache_hits'] += 1
                return int(cost)
        
        self.stats['cache_misses'] += 1
        return None
    
    def implement_request_coalescing(self):
        """Coalesce multiple authentication requests for same user"""
        
        pending_auth = {}  # user -> Future
        
        def coalesced_authenticate(username: str, password: str):
            """Authenticate with request coalescing"""
            
            if username in pending_auth:
                # Wait for in-progress authentication
                return pending_auth[username].result()
            
            # Create future for this authentication
            import concurrent.futures
            future = concurrent.futures.Future()
            pending_auth[username] = future
            
            try:
                # Perform actual authentication
                result = self._do_authenticate(username, password)
                future.set_result(result)
                return result
                
            finally:
                # Clean up
                del pending_auth[username]
        
        return coalesced_authenticate
    
    def get_cache_statistics(self) -> Dict:
        """Get cache performance statistics"""
        
        total_operations = (self.stats['cache_hits'] + 
                          self.stats['cache_misses'])
        
        hit_rate = (self.stats['cache_hits'] / total_operations * 100 
                   if total_operations > 0 else 0)
        
        return {
            'hit_rate': hit_rate,
            'total_operations': total_operations,
            'cache_hits': self.stats['cache_hits'],
            'cache_misses': self.stats['cache_misses'],
            'token_validations': self.stats['token_validations'],
            'password_verifications': self.stats['password_verifications'],
            'cache_effectiveness': (
                self.stats['token_validations'] / 
                (self.stats['token_validations'] + self.stats['password_verifications'])
                * 100 if self.stats['token_validations'] + 
                self.stats['password_verifications'] > 0 else 0
            )
        }

class DistributedRateLimiter:
    """Distributed rate limiting for authentication"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        
    def check_rate_limit_sliding_window(self, key: str, 
                                      limit: int, 
                                      window_seconds: int) -> bool:
        """Sliding window rate limiting using Redis sorted sets"""
        
        now = time.time()
        window_start = now - window_seconds
        
        pipe = self.redis.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)
        
        # Count entries in window
        pipe.zcard(key)
        
        # Add current request
        pipe.zadd(key, {str(now): now})
        
        # Set expiry
        pipe.expire(key, window_seconds + 1)
        
        results = pipe.execute()
        count = results[1]
        
        return count < limit