Authentication and Session Management
Authentication and Session Management
Secure authentication and session management are crucial for protecting user accounts and preventing unauthorized access. Python applications must implement proper password hashing, secure session handling, and protection against common authentication attacks.
import secrets
import hashlib
import bcrypt
from datetime import datetime, timedelta
from typing import Optional, Dict
import jwt
import redis
from functools import wraps
import time
class SecureAuthentication:
def __init__(self, redis_client: redis.Redis, jwt_secret: str):
self.redis = redis_client
self.jwt_secret = jwt_secret
self.failed_attempts = {} # In production, use Redis
def hash_password(self, password: str) -> str:
"""Securely hash passwords using bcrypt"""
# Generate a salt and hash the password
salt = bcrypt.gensalt(rounds=12) # Adjust rounds based on performance needs
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(self, password: str, hashed: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(
password.encode('utf-8'),
hashed.encode('utf-8')
)
def generate_secure_token(self) -> str:
"""Generate cryptographically secure random token"""
return secrets.token_urlsafe(32)
def create_session(self, user_id: int, ip_address: str) -> str:
"""Create secure session with JWT"""
# Check for rate limiting
if self._is_rate_limited(ip_address):
raise RuntimeError("Too many login attempts")
# Generate session ID
session_id = self.generate_secure_token()
# Create JWT token
payload = {
'user_id': user_id,
'session_id': session_id,
'ip': ip_address,
'exp': datetime.utcnow() + timedelta(hours=24),
'iat': datetime.utcnow()
}
token = jwt.encode(payload, self.jwt_secret, algorithm='HS256')
# Store session in Redis with expiration
session_data = {
'user_id': user_id,
'ip_address': ip_address,
'created_at': datetime.utcnow().isoformat(),
'last_activity': datetime.utcnow().isoformat()
}
self.redis.setex(
f"session:{session_id}",
timedelta(hours=24),
json.dumps(session_data)
)
return token
def validate_session(self, token: str, ip_address: str) -> Optional[Dict]:
"""Validate session token and check for anomalies"""
try:
# Decode JWT
payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
# Check IP address consistency
if payload.get('ip') != ip_address:
# Log potential session hijacking attempt
self._log_security_event('ip_mismatch', payload)
return None
# Verify session exists in Redis
session_id = payload.get('session_id')
session_data = self.redis.get(f"session:{session_id}")
if not session_data:
return None
# Update last activity
session = json.loads(session_data)
session['last_activity'] = datetime.utcnow().isoformat()
self.redis.setex(
f"session:{session_id}",
timedelta(hours=24),
json.dumps(session)
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
self._log_security_event('invalid_token', {'token': token[:20]})
return None
def _is_rate_limited(self, ip_address: str) -> bool:
"""Check if IP is rate limited"""
key = f"login_attempts:{ip_address}"
attempts = self.redis.incr(key)
if attempts == 1:
# Set expiration on first attempt
self.redis.expire(key, 900) # 15 minutes
return attempts > 5 # Max 5 attempts per 15 minutes
def _log_security_event(self, event_type: str, data: Dict):
"""Log security events for monitoring"""
event = {
'type': event_type,
'timestamp': datetime.utcnow().isoformat(),
'data': data
}
# In production, send to logging service
print(f"SECURITY_EVENT: {json.dumps(event)}")
def require_auth(f):
"""Decorator for protecting routes"""
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return jsonify({'error': 'No token provided'}), 401
auth = SecureAuthentication(redis_client, JWT_SECRET)
session = auth.validate_session(token, request.remote_addr)
if not session:
return jsonify({'error': 'Invalid token'}), 401
# Add user info to request context
g.user_id = session['user_id']
g.session_id = session['session_id']
return f(*args, **kwargs)
return decorated_function