Distributed Architecture Patterns

Distributed Architecture Patterns

Scaling password hashing across multiple servers requires careful architectural decisions. Stateless authentication services scale horizontally, but shared state like rate limiting and session management needs coordination. Implement distributed rate limiting using Redis or similar systems to prevent attackers from bypassing limits by targeting different servers.

Load balancing strategies must account for the CPU and memory-intensive nature of password hashing. Round-robin distribution may overload servers if some requests require more intensive hashing than others. Implement least-connections or weighted round-robin algorithms that consider server capacity and current load. Monitor response times and adjust distribution accordingly.

class DistributedAuthenticationSystem:
    """Distributed architecture for authentication at scale"""
    
    def __init__(self, nodes: List[str], redis_cluster: redis.RedisCluster):
        self.nodes = nodes
        self.redis = redis_cluster
        self.node_stats = {node: {'load': 0, 'capacity': 100} for node in nodes}
        
    def select_authentication_node(self, username: str) -> str:
        """Select optimal node for authentication"""
        
        # Option 1: Consistent hashing for session affinity
        hash_value = hashlib.md5(username.encode()).hexdigest()
        hash_int = int(hash_value[:8], 16)
        selected_node = self.nodes[hash_int % len(self.nodes)]
        
        # Check if selected node is healthy
        if self._is_node_healthy(selected_node):
            return selected_node
        
        # Fallback to least loaded node
        return self._select_least_loaded_node()
    
    def _select_least_loaded_node(self) -> str:
        """Select node with lowest load"""
        
        # Get current load from Redis
        for node in self.nodes:
            load_key = f"node_load:{node}"
            load = self.redis.get(load_key)
            if load:
                self.node_stats[node]['load'] = int(load)
        
        # Select least loaded
        return min(self.nodes, key=lambda n: 
                  self.node_stats[n]['load'] / self.node_stats[n]['capacity'])
    
    def implement_distributed_session(self, user_id: str, session_data: Dict):
        """Implement distributed session management"""
        
        session_id = secrets.token_urlsafe(32)
        session_key = f"session:{session_id}"
        
        # Store session in Redis with replication
        session_data['user_id'] = user_id
        session_data['created'] = datetime.utcnow().isoformat()
        
        self.redis.hset(session_key, mapping=session_data)
        self.redis.expire(session_key, 3600)  # 1 hour
        
        # Replicate to backup if critical
        if session_data.get('critical'):
            backup_key = f"session_backup:{session_id}"
            self.redis.hset(backup_key, mapping=session_data)
            self.redis.expire(backup_key, 3600)
        
        return session_id
    
    def coordinate_password_migration(self, username: str) -> bool:
        """Coordinate password migration across nodes"""
        
        lock_key = f"migration_lock:{username}"
        lock_value = str(time.time())
        
        # Acquire distributed lock
        if self.redis.set(lock_key, lock_value, nx=True, ex=30):
            try:
                # Check if already migrated
                status_key = f"migration_status:{username}"
                if self.redis.get(status_key) == b'completed':
                    return True
                
                # Perform migration
                # ... migration logic ...
                
                # Mark as completed
                self.redis.set(status_key, 'completed', ex=86400)
                return True
                
            finally:
                # Release lock if we still own it
                if self.redis.get(lock_key) == lock_value.encode():
                    self.redis.delete(lock_key)
        
        # Another node is handling migration
        return False
    
    def implement_circuit_breaker(self, node: str):
        """Circuit breaker for failing nodes"""
        
        class CircuitBreaker:
            def __init__(self, failure_threshold=5, recovery_timeout=60):
                self.failure_threshold = failure_threshold
                self.recovery_timeout = recovery_timeout
                self.failures = 0
                self.last_failure_time = None
                self.state = 'closed'  # closed, open, half-open
            
            def call(self, func, *args, **kwargs):
                if self.state == 'open':
                    if (time.time() - self.last_failure_time > 
                        self.recovery_timeout):
                        self.state = 'half-open'
                    else:
                        raise Exception("Circuit breaker is open")
                
                try:
                    result = func(*args, **kwargs)
                    
                    if self.state == 'half-open':
                        self.state = 'closed'
                        self.failures = 0
                    
                    return result
                    
                except Exception as e:
                    self.failures += 1
                    self.last_failure_time = time.time()
                    
                    if self.failures >= self.failure_threshold:
                        self.state = 'open'
                    
                    raise
        
        return CircuitBreaker()