Secure User Registration and Authentication System
Secure User Registration and Authentication System
User registration and authentication form the foundation of application security. This example demonstrates building a complete authentication system with proper security controls, from initial registration through session management.
# Python - Secure User Registration and Authentication
import secrets
import re
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Tuple
import bcrypt
import jwt
from flask import Flask, request, jsonify, session
from functools import wraps
import redis
import pyotp
from email_validator import validate_email, EmailNotValidError
from sqlalchemy import create_engine, Column, String, DateTime, Boolean, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import hashlib
import base64
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
email = Column(String(255), unique=True, nullable=False)
username = Column(String(50), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
email_verified = Column(Boolean, default=False)
totp_secret = Column(String(32), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
last_login = Column(DateTime, nullable=True)
failed_login_attempts = Column(Integer, default=0)
locked_until = Column(DateTime, nullable=True)
class SecureAuthenticationSystem:
def __init__(self, app: Flask, db_url: str, redis_client: redis.Redis):
self.app = app
self.db_engine = create_engine(db_url)
self.Session = sessionmaker(bind=self.db_engine)
self.redis = redis_client
# Security configuration
self.jwt_secret = app.config['JWT_SECRET']
self.password_min_length = 12
self.max_login_attempts = 5
self.lockout_duration = timedelta(minutes=30)
self.session_timeout = timedelta(hours=24)
self.email_verification_timeout = timedelta(hours=48)
# Initialize database
Base.metadata.create_all(self.db_engine)
def register_user(self, registration_data: Dict) -> Tuple[bool, str]:
"""Secure user registration with comprehensive validation"""
try:
# Validate input data
email = registration_data.get('email', '').strip().lower()
username = registration_data.get('username', '').strip()
password = registration_data.get('password', '')
# Validate email
try:
valid_email = validate_email(email)
email = valid_email.email
except EmailNotValidError:
return False, "Invalid email address"
# Validate username
if not self._validate_username(username):
return False, "Username must be 3-20 characters, alphanumeric and underscore only"
# Validate password strength
password_errors = self._validate_password_strength(password, username, email)
if password_errors:
return False, f"Password requirements not met: {', '.join(password_errors)}"
# Check if user exists
session = self.Session()
try:
existing_user = session.query(User).filter(
(User.email == email) | (User.username == username)
).first()
if existing_user:
if existing_user.email == email:
return False, "Email already registered"
else:
return False, "Username already taken"
# Hash password
password_hash = self._hash_password(password)
# Create user
new_user = User(
email=email,
username=username,
password_hash=password_hash,
email_verified=False
)
session.add(new_user)
session.commit()
# Send verification email
self._send_verification_email(new_user.id, email)
# Log registration event
self._log_security_event('user_registration', {
'user_id': new_user.id,
'username': username,
'ip_address': request.remote_addr
})
return True, "Registration successful. Please check your email to verify your account."
finally:
session.close()
except Exception as e:
self._log_security_event('registration_error', {
'error': str(e),
'ip_address': request.remote_addr
})
return False, "Registration failed. Please try again."
def _validate_username(self, username: str) -> bool:
"""Validate username format"""
if not username or len(username) < 3 or len(username) > 20:
return False
# Only alphanumeric and underscore
if not re.match(r'^[a-zA-Z0-9_]+$', username):
return False
# Prevent reserved usernames
reserved = ['admin', 'root', 'system', 'api', 'www', 'mail']
if username.lower() in reserved:
return False
return True
def _validate_password_strength(self, password: str,
username: str, email: str) -> List[str]:
"""Comprehensive password validation"""
errors = []
if len(password) < self.password_min_length:
errors.append(f"Minimum {self.password_min_length} characters required")
if not re.search(r'[A-Z]', password):
errors.append("At least one uppercase letter required")
if not re.search(r'[a-z]', password):
errors.append("At least one lowercase letter required")
if not re.search(r'\d', password):
errors.append("At least one digit required")
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append("At least one special character required")
# Check for common patterns
if username.lower() in password.lower():
errors.append("Password cannot contain username")
if email.split('@')[0].lower() in password.lower():
errors.append("Password cannot contain email")
# Check against common passwords
common_passwords = self._load_common_passwords()
if password.lower() in common_passwords:
errors.append("Password is too common")
# Check for repeated characters
if re.search(r'(.)\1{2,}', password):
errors.append("Password contains too many repeated characters")
return errors
def _hash_password(self, password: str) -> str:
"""Securely hash password using bcrypt"""
return bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt(rounds=12)
).decode('utf-8')
def authenticate_user(self, credentials: Dict) -> Tuple[bool, Dict]:
"""Secure user authentication with rate limiting"""
username_or_email = credentials.get('username', '').strip()
password = credentials.get('password', '')
totp_code = credentials.get('totp_code')
# Rate limiting check
if not self._check_rate_limit(request.remote_addr):
return False, {'error': 'Too many login attempts. Please try again later.'}
session = self.Session()
try:
# Find user by username or email
user = session.query(User).filter(
(User.username == username_or_email) |
(User.email == username_or_email.lower())
).first()
if not user:
# Prevent username enumeration
self._simulate_password_check()
return False, {'error': 'Invalid credentials'}
# Check if account is locked
if user.locked_until and datetime.utcnow() < user.locked_until:
remaining = (user.locked_until - datetime.utcnow()).total_seconds()
return False, {
'error': f'Account locked. Try again in {int(remaining/60)} minutes.'
}
# Verify password
if not bcrypt.checkpw(password.encode('utf-8'),
user.password_hash.encode('utf-8')):
# Increment failed attempts
user.failed_login_attempts += 1
if user.failed_login_attempts >= self.max_login_attempts:
user.locked_until = datetime.utcnow() + self.lockout_duration
session.commit()
self._log_security_event('account_locked', {
'user_id': user.id,
'ip_address': request.remote_addr
})
return False, {'error': 'Account locked due to multiple failed attempts'}
session.commit()
return False, {'error': 'Invalid credentials'}
# Check email verification
if not user.email_verified:
return False, {'error': 'Please verify your email before logging in'}
# Check 2FA if enabled
if user.totp_secret:
if not totp_code:
return False, {'error': '2FA code required', 'requires_2fa': True}
if not self._verify_totp(user.totp_secret, totp_code):
return False, {'error': 'Invalid 2FA code'}
# Successful authentication
user.failed_login_attempts = 0
user.last_login = datetime.utcnow()
session.commit()
# Generate tokens
tokens = self._generate_auth_tokens(user.id)
# Log successful login
self._log_security_event('successful_login', {
'user_id': user.id,
'ip_address': request.remote_addr
})
return True, {
'user_id': user.id,
'username': user.username,
'tokens': tokens
}
except Exception as e:
self._log_security_event('authentication_error', {
'error': str(e),
'ip_address': request.remote_addr
})
return False, {'error': 'Authentication failed'}
finally:
session.close()
def _simulate_password_check(self):
"""Simulate password check timing to prevent enumeration"""
# Perform a dummy bcrypt operation
bcrypt.checkpw(b'dummy', bcrypt.hashpw(b'dummy', bcrypt.gensalt(12)))
def _check_rate_limit(self, ip_address: str) -> bool:
"""Check login rate limits"""
key = f"login_attempts:{ip_address}"
# Increment counter
attempts = self.redis.incr(key)
# Set expiry on first attempt
if attempts == 1:
self.redis.expire(key, 900) # 15 minutes
# Allow 10 attempts per 15 minutes
return attempts <= 10
def _generate_auth_tokens(self, user_id: int) -> Dict[str, str]:
"""Generate JWT tokens"""
# Access token - short lived
access_payload = {
'user_id': user_id,
'type': 'access',
'exp': datetime.now(timezone.utc) + timedelta(minutes=15),
'iat': datetime.now(timezone.utc),
'jti': secrets.token_urlsafe(16)
}
access_token = jwt.encode(
access_payload,
self.jwt_secret,
algorithm='HS256'
)
# Refresh token - longer lived
refresh_payload = {
'user_id': user_id,
'type': 'refresh',
'exp': datetime.now(timezone.utc) + timedelta(days=30),
'iat': datetime.now(timezone.utc),
'jti': secrets.token_urlsafe(32)
}
refresh_token = jwt.encode(
refresh_payload,
self.jwt_secret,
algorithm='HS256'
)
# Store refresh token in Redis
self.redis.setex(
f"refresh_token:{refresh_payload['jti']}",
timedelta(days=30).total_seconds(),
str(user_id)
)
return {
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer',
'expires_in': 900 # 15 minutes
}
def setup_2fa(self, user_id: int) -> Dict[str, str]:
"""Setup TOTP-based 2FA"""
session = self.Session()
try:
user = session.query(User).filter_by(id=user_id).first()
if not user:
return {'error': 'User not found'}
# Generate secret
secret = pyotp.random_base32()
# Generate provisioning URI for QR code
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=user.email,
issuer_name='SecureApp'
)
# Store secret (in production, encrypt this)
user.totp_secret = secret
session.commit()
return {
'secret': secret,
'qr_code': self._generate_qr_code(provisioning_uri),
'manual_entry_key': secret
}
finally:
session.close()
def _verify_totp(self, secret: str, code: str) -> bool:
"""Verify TOTP code"""
totp = pyotp.TOTP(secret)
# Allow 1 window drift for clock skew
return totp.verify(code, valid_window=1)
# Flask routes implementation
app = Flask(__name__)
app.config['SECRET_KEY'] = secrets.token_hex(32)
app.config['JWT_SECRET'] = secrets.token_hex(32)
# Initialize auth system
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
auth_system = SecureAuthenticationSystem(
app,
'postgresql://user:pass@localhost/authdb',
redis_client
)
@app.route('/api/register', methods=['POST'])
def register():
"""User registration endpoint"""
try:
data = request.get_json()
# Validate request
required_fields = ['email', 'username', 'password']
if not all(field in data for field in required_fields):
return jsonify({'error': 'Missing required fields'}), 400
# Register user
success, message = auth_system.register_user(data)
if success:
return jsonify({'message': message}), 201
else:
return jsonify({'error': message}), 400
except Exception as e:
app.logger.error(f"Registration error: {e}")
return jsonify({'error': 'Registration failed'}), 500
@app.route('/api/login', methods=['POST'])
def login():
"""User login endpoint"""
try:
data = request.get_json()
# Validate request
if not data.get('username') or not data.get('password'):
return jsonify({'error': 'Username and password required'}), 400
# Authenticate
success, result = auth_system.authenticate_user(data)
if success:
response = jsonify({
'user': {
'id': result['user_id'],
'username': result['username']
},
'tokens': result['tokens']
})
# Set secure cookie for refresh token
response.set_cookie(
'refresh_token',
result['tokens']['refresh_token'],
max_age=30*24*60*60, # 30 days
secure=True,
httponly=True,
samesite='Strict'
)
return response, 200
else:
status_code = 401
if result.get('requires_2fa'):
status_code = 200 # Need 2FA code
return jsonify(result), status_code
except Exception as e:
app.logger.error(f"Login error: {e}")
return jsonify({'error': 'Login failed'}), 500
@app.route('/api/verify-email/<token>', methods=['GET'])
def verify_email(token):
"""Email verification endpoint"""
try:
# Verify token
payload = jwt.decode(
token,
app.config['JWT_SECRET'],
algorithms=['HS256']
)
if payload.get('type') != 'email_verification':
return jsonify({'error': 'Invalid token'}), 400
# Update user
session = auth_system.Session()
try:
user = session.query(User).filter_by(id=payload['user_id']).first()
if user:
user.email_verified = True
session.commit()
return jsonify({'message': 'Email verified successfully'}), 200
else:
return jsonify({'error': 'User not found'}), 404
finally:
session.close()
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Verification link expired'}), 400
except Exception:
return jsonify({'error': 'Invalid verification link'}), 400
def require_auth(f):
"""Decorator for protecting routes"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Get token from header
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({'error': 'Invalid authorization header'}), 401
token = auth_header[7:]
try:
# Verify token
payload = jwt.decode(
token,
app.config['JWT_SECRET'],
algorithms=['HS256']
)
if payload.get('type') != 'access':
return jsonify({'error': 'Invalid token type'}), 401
# Add user context to request
request.user_id = payload['user_id']
return f(*args, **kwargs)
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token expired'}), 401
except Exception:
return jsonify({'error': 'Invalid token'}), 401
return decorated_function
@app.route('/api/profile', methods=['GET'])
@require_auth
def get_profile():
"""Protected endpoint example"""
# Access user ID from request context
user_id = request.user_id
session = auth_system.Session()
try:
user = session.query(User).filter_by(id=user_id).first()
if user:
return jsonify({
'id': user.id,
'username': user.username,
'email': user.email,
'created_at': user.created_at.isoformat(),
'two_factor_enabled': bool(user.totp_secret)
}), 200
else:
return jsonify({'error': 'User not found'}), 404
finally:
session.close()
// JavaScript - Secure Authentication Implementation
const express = require('express');
const bcrypt = require('bcrypt');
const jwt = require('jsonwebtoken');
const speakeasy = require('speakeasy');
const QRCode = require('qrcode');
const validator = require('validator');
const rateLimit = require('express-rate-limit');
const { v4: uuidv4 } = require('uuid');
const redis = require('redis');
const { promisify } = require('util');
class SecureAuthenticationSystem {
constructor(app, db, redisClient) {
this.app = app;
this.db = db;
this.redis = redisClient;
// Promisify Redis methods
this.redisGet = promisify(redisClient.get).bind(redisClient);
this.redisSet = promisify(redisClient.set).bind(redisClient);
this.redisSetex = promisify(redisClient.setex).bind(redisClient);
this.redisIncr = promisify(redisClient.incr).bind(redisClient);
this.redisExpire = promisify(redisClient.expire).bind(redisClient);
// Security configuration
this.config = {
jwtSecret: process.env.JWT_SECRET || 'change-this-secret',
bcryptRounds: 12,
passwordMinLength: 12,
maxLoginAttempts: 5,
lockoutDuration: 30 * 60 * 1000, // 30 minutes
sessionTimeout: 24 * 60 * 60 * 1000, // 24 hours
emailVerificationTimeout: 48 * 60 * 60 * 1000, // 48 hours
accessTokenExpiry: '15m',
refreshTokenExpiry: '30d'
};
this.setupMiddleware();
this.setupRoutes();
}
setupMiddleware() {
// Rate limiting for auth endpoints
const authLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 10, // 10 requests per window
message: 'Too many authentication attempts, please try again later',
standardHeaders: true,
legacyHeaders: false,
handler: (req, res) => {
this.logSecurityEvent('rate_limit_exceeded', {
ip: req.ip,
endpoint: req.path
});
res.status(429).json({
error: 'Too many requests, please try again later'
});
}
});
// Apply rate limiting to auth routes
this.app.use('/api/auth', authLimiter);
// Security headers
this.app.use((req, res, next) => {
res.setHeader('X-Content-Type-Options', 'nosniff');
res.setHeader('X-Frame-Options', 'DENY');
res.setHeader('X-XSS-Protection', '1; mode=block');
res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains');
next();
});
}
setupRoutes() {
// Registration endpoint
this.app.post('/api/auth/register', async (req, res) => {
try {
const result = await this.registerUser(req.body);
if (result.success) {
res.status(201).json({
message: result.message,
userId: result.userId
});
} else {
res.status(400).json({
error: result.error
});
}
} catch (error) {
this.logSecurityEvent('registration_error', {
error: error.message,
ip: req.ip
});
res.status(500).json({
error: 'Registration failed'
});
}
});
// Login endpoint
this.app.post('/api/auth/login', async (req, res) => {
try {
const result = await this.authenticateUser(req.body, req.ip);
if (result.success) {
// Set refresh token as httpOnly cookie
res.cookie('refreshToken', result.tokens.refreshToken, {
httpOnly: true,
secure: process.env.NODE_ENV === 'production',
sameSite: 'strict',
maxAge: 30 * 24 * 60 * 60 * 1000 // 30 days
});
res.json({
user: result.user,
accessToken: result.tokens.accessToken,
expiresIn: result.tokens.expiresIn
});
} else {
const statusCode = result.requires2FA ? 200 : 401;
res.status(statusCode).json(result);
}
} catch (error) {
this.logSecurityEvent('login_error', {
error: error.message,
ip: req.ip
});
res.status(500).json({
error: 'Login failed'
});
}
});
// Email verification endpoint
this.app.get('/api/auth/verify-email/:token', async (req, res) => {
try {
const result = await this.verifyEmail(req.params.token);
if (result.success) {
res.json({ message: result.message });
} else {
res.status(400).json({ error: result.error });
}
} catch (error) {
res.status(500).json({
error: 'Verification failed'
});
}
});
// 2FA setup endpoint
this.app.post('/api/auth/2fa/setup', this.requireAuth.bind(this), async (req, res) => {
try {
const result = await this.setup2FA(req.user.id);
res.json(result);
} catch (error) {
res.status(500).json({
error: 'Failed to setup 2FA'
});
}
});
}
async registerUser(data) {
// Validate input
const { email, username, password } = data;
if (!email || !username || !password) {
return { success: false, error: 'All fields are required' };
}
// Validate email
if (!validator.isEmail(email)) {
return { success: false, error: 'Invalid email address' };
}
// Normalize email
const normalizedEmail = validator.normalizeEmail(email);
// Validate username
const usernameValidation = this.validateUsername(username);
if (!usernameValidation.valid) {
return { success: false, error: usernameValidation.error };
}
// Validate password
const passwordValidation = await this.validatePassword(password, username, email);
if (!passwordValidation.valid) {
return { success: false, error: passwordValidation.errors.join('. ') };
}
// Check if user exists
const existingUser = await this.db.query(
'SELECT id FROM users WHERE email = $1 OR username = $2',
[normalizedEmail, username]
);
if (existingUser.rows.length > 0) {
return { success: false, error: 'User already exists' };
}
// Hash password
const passwordHash = await bcrypt.hash(password, this.config.bcryptRounds);
// Create user
const result = await this.db.query(
`INSERT INTO users (id, email, username, password_hash, email_verified, created_at)
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
[uuidv4(), normalizedEmail, username, passwordHash, false, new Date()]
);
const userId = result.rows[0].id;
// Send verification email
await this.sendVerificationEmail(userId, normalizedEmail);
// Log event
this.logSecurityEvent('user_registered', {
userId,
username
});
return {
success: true,
message: 'Registration successful. Please check your email to verify your account.',
userId
};
}
validateUsername(username) {
if (!username || username.length < 3 || username.length > 20) {
return { valid: false, error: 'Username must be 3-20 characters' };
}
if (!/^[a-zA-Z0-9_]+$/.test(username)) {
return { valid: false, error: 'Username can only contain letters, numbers, and underscores' };
}
const reserved = ['admin', 'root', 'system', 'api', 'www'];
if (reserved.includes(username.toLowerCase())) {
return { valid: false, error: 'Username is reserved' };
}
return { valid: true };
}
async validatePassword(password, username, email) {
const errors = [];
if (password.length < this.config.passwordMinLength) {
errors.push(`Password must be at least ${this.config.passwordMinLength} characters`);
}
if (!/[A-Z]/.test(password)) {
errors.push('Password must contain at least one uppercase letter');
}
if (!/[a-z]/.test(password)) {
errors.push('Password must contain at least one lowercase letter');
}
if (!/\d/.test(password)) {
errors.push('Password must contain at least one digit');
}
if (!/[!@#$%^&*(),.?":{}|<>]/.test(password)) {
errors.push('Password must contain at least one special character');
}
// Check for username/email in password
if (password.toLowerCase().includes(username.toLowerCase())) {
errors.push('Password cannot contain username');
}
const emailLocal = email.split('@')[0];
if (password.toLowerCase().includes(emailLocal.toLowerCase())) {
errors.push('Password cannot contain part of email');
}
// Check common passwords
const commonPasswords = await this.loadCommonPasswords();
if (commonPasswords.has(password.toLowerCase())) {
errors.push('Password is too common');
}
// Check for repeated characters
if (/(.)\1{2,}/.test(password)) {
errors.push('Password contains too many repeated characters');
}
return {
valid: errors.length === 0,
errors
};
}
async authenticateUser(credentials, ipAddress) {
const { username, password, totpCode } = credentials;
if (!username || !password) {
return { success: false, error: 'Username and password required' };
}
// Check rate limiting
const rateLimitKey = `login_attempts:${ipAddress}`;
const attempts = await this.redisIncr(rateLimitKey);
if (attempts === 1) {
await this.redisExpire(rateLimitKey, 900); // 15 minutes
}
if (attempts > 10) {
return { success: false, error: 'Too many login attempts' };
}
// Find user
const userResult = await this.db.query(
`SELECT id, username, email, password_hash, email_verified,
totp_secret, failed_login_attempts, locked_until
FROM users
WHERE username = $1 OR email = $1`,
[username]
);
if (userResult.rows.length === 0) {
// Simulate password check to prevent timing attacks
await bcrypt.compare('dummy', '$2b$12$dummy.hash.to.prevent.timing');
return { success: false, error: 'Invalid credentials' };
}
const user = userResult.rows[0];
// Check if account is locked
if (user.locked_until && new Date() < new Date(user.locked_until)) {
const remaining = Math.ceil((new Date(user.locked_until) - new Date()) / 60000);
return {
success: false,
error: `Account locked. Try again in ${remaining} minutes`
};
}
// Verify password
const passwordValid = await bcrypt.compare(password, user.password_hash);
if (!passwordValid) {
// Increment failed attempts
const newAttempts = user.failed_login_attempts + 1;
let lockedUntil = null;
if (newAttempts >= this.config.maxLoginAttempts) {
lockedUntil = new Date(Date.now() + this.config.lockoutDuration);
this.logSecurityEvent('account_locked', {
userId: user.id,
ip: ipAddress
});
}
await this.db.query(
'UPDATE users SET failed_login_attempts = $1, locked_until = $2 WHERE id = $3',
[newAttempts, lockedUntil, user.id]
);
return { success: false, error: 'Invalid credentials' };
}
// Check email verification
if (!user.email_verified) {
return { success: false, error: 'Please verify your email before logging in' };
}
// Check 2FA
if (user.totp_secret) {
if (!totpCode) {
return { success: false, error: '2FA code required', requires2FA: true };
}
const verified = speakeasy.totp.verify({
secret: user.totp_secret,
encoding: 'base32',
token: totpCode,
window: 1
});
if (!verified) {
return { success: false, error: 'Invalid 2FA code' };
}
}
// Reset failed attempts and update last login
await this.db.query(
'UPDATE users SET failed_login_attempts = 0, last_login = $1 WHERE id = $2',
[new Date(), user.id]
);
// Generate tokens
const tokens = await this.generateAuthTokens(user.id);
// Log successful login
this.logSecurityEvent('successful_login', {
userId: user.id,
ip: ipAddress
});
return {
success: true,
user: {
id: user.id,
username: user.username,
email: user.email
},
tokens
};
}
async generateAuthTokens(userId) {
// Access token
const accessToken = jwt.sign(
{
userId,
type: 'access',
jti: uuidv4()
},
this.config.jwtSecret,
{
expiresIn: this.config.accessTokenExpiry,
issuer: 'SecureApp',
audience: 'SecureApp'
}
);
// Refresh token
const refreshTokenId = uuidv4();
const refreshToken = jwt.sign(
{
userId,
type: 'refresh',
jti: refreshTokenId
},
this.config.jwtSecret,
{
expiresIn: this.config.refreshTokenExpiry,
issuer: 'SecureApp',
audience: 'SecureApp'
}
);
// Store refresh token in Redis
await this.redisSetex(
`refresh_token:${refreshTokenId}`,
30 * 24 * 60 * 60, // 30 days
userId
);
return {
accessToken,
refreshToken,
expiresIn: 900 // 15 minutes
};
}
async setup2FA(userId) {
// Generate secret
const secret = speakeasy.generateSecret({
name: 'SecureApp',
length: 32
});
// Store secret in database
await this.db.query(
'UPDATE users SET totp_secret = $1 WHERE id = $2',
[secret.base32, userId]
);
// Generate QR code
const qrCode = await QRCode.toDataURL(secret.otpauth_url);
return {
secret: secret.base32,
qrCode,
manualEntryKey: secret.base32
};
}
requireAuth(req, res, next) {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.status(401).json({ error: 'No valid auth token provided' });
}
const token = authHeader.substring(7);
try {
const payload = jwt.verify(token, this.config.jwtSecret, {
issuer: 'SecureApp',
audience: 'SecureApp'
});
if (payload.type !== 'access') {
return res.status(401).json({ error: 'Invalid token type' });
}
req.user = { id: payload.userId };
next();
} catch (error) {
if (error.name === 'TokenExpiredError') {
return res.status(401).json({ error: 'Token expired' });
}
return res.status(401).json({ error: 'Invalid token' });
}
}
logSecurityEvent(eventType, details) {
const event = {
type: eventType,
timestamp: new Date().toISOString(),
details,
environment: process.env.NODE_ENV
};
// In production, send to logging service
console.log('SECURITY_EVENT:', JSON.stringify(event));
}
}
// Initialize the authentication system
const app = express();
const db = require('./db'); // PostgreSQL connection
const redisClient = redis.createClient();
const authSystem = new SecureAuthenticationSystem(app, db, redisClient);
module.exports = { authSystem, app };
This comprehensive authentication system demonstrates multiple security principles working together:
- Strong password requirements with entropy checking
- Secure password hashing with bcrypt
- Rate limiting to prevent brute force attacks
- Account lockout after failed attempts
- Email verification for new accounts
- Optional two-factor authentication
- JWT-based session management with refresh tokens
- Protection against timing attacks
- Comprehensive security event logging
- Input validation and sanitization throughout
The implementation shows how security must be built into every aspect of the authentication flow, from initial registration through ongoing session management. Each security control addresses specific threats while working together to create a robust authentication system.