Authentication and Session Management

Authentication and Session Management

Secure authentication and session management are crucial for protecting user accounts and preventing unauthorized access. Python applications must implement proper password hashing, secure session handling, and protection against common authentication attacks.

import secrets
import hashlib
import bcrypt
from datetime import datetime, timedelta
from typing import Optional, Dict
import jwt
import redis
from functools import wraps
import time

class SecureAuthentication:
    def __init__(self, redis_client: redis.Redis, jwt_secret: str):
        self.redis = redis_client
        self.jwt_secret = jwt_secret
        self.failed_attempts = {}  # In production, use Redis
    
    def hash_password(self, password: str) -> str:
        """Securely hash passwords using bcrypt"""
        # Generate a salt and hash the password
        salt = bcrypt.gensalt(rounds=12)  # Adjust rounds based on performance needs
        hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
        return hashed.decode('utf-8')
    
    def verify_password(self, password: str, hashed: str) -> bool:
        """Verify password against hash"""
        return bcrypt.checkpw(
            password.encode('utf-8'), 
            hashed.encode('utf-8')
        )
    
    def generate_secure_token(self) -> str:
        """Generate cryptographically secure random token"""
        return secrets.token_urlsafe(32)
    
    def create_session(self, user_id: int, ip_address: str) -> str:
        """Create secure session with JWT"""
        # Check for rate limiting
        if self._is_rate_limited(ip_address):
            raise RuntimeError("Too many login attempts")
        
        # Generate session ID
        session_id = self.generate_secure_token()
        
        # Create JWT token
        payload = {
            'user_id': user_id,
            'session_id': session_id,
            'ip': ip_address,
            'exp': datetime.utcnow() + timedelta(hours=24),
            'iat': datetime.utcnow()
        }
        
        token = jwt.encode(payload, self.jwt_secret, algorithm='HS256')
        
        # Store session in Redis with expiration
        session_data = {
            'user_id': user_id,
            'ip_address': ip_address,
            'created_at': datetime.utcnow().isoformat(),
            'last_activity': datetime.utcnow().isoformat()
        }
        
        self.redis.setex(
            f"session:{session_id}",
            timedelta(hours=24),
            json.dumps(session_data)
        )
        
        return token
    
    def validate_session(self, token: str, ip_address: str) -> Optional[Dict]:
        """Validate session token and check for anomalies"""
        try:
            # Decode JWT
            payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
            
            # Check IP address consistency
            if payload.get('ip') != ip_address:
                # Log potential session hijacking attempt
                self._log_security_event('ip_mismatch', payload)
                return None
            
            # Verify session exists in Redis
            session_id = payload.get('session_id')
            session_data = self.redis.get(f"session:{session_id}")
            
            if not session_data:
                return None
            
            # Update last activity
            session = json.loads(session_data)
            session['last_activity'] = datetime.utcnow().isoformat()
            self.redis.setex(
                f"session:{session_id}",
                timedelta(hours=24),
                json.dumps(session)
            )
            
            return payload
            
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            self._log_security_event('invalid_token', {'token': token[:20]})
            return None
    
    def _is_rate_limited(self, ip_address: str) -> bool:
        """Check if IP is rate limited"""
        key = f"login_attempts:{ip_address}"
        attempts = self.redis.incr(key)
        
        if attempts == 1:
            # Set expiration on first attempt
            self.redis.expire(key, 900)  # 15 minutes
        
        return attempts > 5  # Max 5 attempts per 15 minutes
    
    def _log_security_event(self, event_type: str, data: Dict):
        """Log security events for monitoring"""
        event = {
            'type': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'data': data
        }
        # In production, send to logging service
        print(f"SECURITY_EVENT: {json.dumps(event)}")

def require_auth(f):
    """Decorator for protecting routes"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        token = request.headers.get('Authorization', '').replace('Bearer ', '')
        
        if not token:
            return jsonify({'error': 'No token provided'}), 401
        
        auth = SecureAuthentication(redis_client, JWT_SECRET)
        session = auth.validate_session(token, request.remote_addr)
        
        if not session:
            return jsonify({'error': 'Invalid token'}), 401
        
        # Add user info to request context
        g.user_id = session['user_id']
        g.session_id = session['session_id']
        
        return f(*args, **kwargs)
    
    return decorated_function