Token-Based Authentication

Token-Based Authentication

Modern applications increasingly rely on token-based authentication, particularly for API access and single-page applications. JSON Web Tokens (JWT) and opaque tokens each have their place, but must be implemented securely to prevent token theft, replay attacks, and other vulnerabilities.

# Python - Secure Token Authentication
import jwt
import secrets
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional, Tuple
import redis
from cryptography.fernet import Fernet
import base64
import hashlib

class SecureTokenManager:
    def __init__(self, secret_key: str, redis_client: redis.Redis):
        self.secret_key = secret_key
        self.redis = redis_client
        self.fernet = Fernet(base64.urlsafe_b64encode(hashlib.sha256(secret_key.encode()).digest()))
        
        # Token configuration
        self.access_token_lifetime = timedelta(minutes=15)
        self.refresh_token_lifetime = timedelta(days=30)
        self.max_refresh_count = 5
        
    def generate_token_pair(self, user_id: int, 
                           additional_claims: Dict = None) -> Dict[str, str]:
        """Generate access and refresh token pair"""
        # Generate unique token ID for tracking
        token_id = secrets.token_urlsafe(16)
        
        # Access token with short lifetime
        access_payload = {
            'user_id': user_id,
            'type': 'access',
            'jti': token_id,
            'iat': datetime.now(timezone.utc),
            'exp': datetime.now(timezone.utc) + self.access_token_lifetime,
            **(additional_claims or {})
        }
        
        # Refresh token with longer lifetime
        refresh_token_id = secrets.token_urlsafe(32)
        refresh_payload = {
            'user_id': user_id,
            'type': 'refresh',
            'jti': refresh_token_id,
            'iat': datetime.now(timezone.utc),
            'exp': datetime.now(timezone.utc) + self.refresh_token_lifetime,
            'refresh_count': 0
        }
        
        # Store refresh token in Redis
        self.redis.setex(
            f"refresh_token:{refresh_token_id}",
            self.refresh_token_lifetime,
            json.dumps({
                'user_id': user_id,
                'access_token_id': token_id,
                'created_at': datetime.now(timezone.utc).isoformat(),
                'refresh_count': 0
            })
        )
        
        access_token = jwt.encode(access_payload, self.secret_key, algorithm='HS256')
        refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm='HS256')
        
        return {
            'access_token': access_token,
            'refresh_token': refresh_token,
            'token_type': 'Bearer',
            'expires_in': int(self.access_token_lifetime.total_seconds())
        }
    
    def verify_access_token(self, token: str) -> Optional[Dict]:
        """Verify and decode access token"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            
            # Verify token type
            if payload.get('type') != 'access':
                return None
            
            # Check if token is revoked
            if self.redis.exists(f"revoked_token:{payload['jti']}"):
                return None
            
            return payload
            
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None
    
    def refresh_access_token(self, refresh_token: str) -> Optional[Dict[str, str]]:
        """Use refresh token to get new access token"""
        try:
            payload = jwt.decode(refresh_token, self.secret_key, algorithms=['HS256'])
            
            # Verify token type
            if payload.get('type') != 'refresh':
                return None
            
            # Get refresh token data from Redis
            token_data = self.redis.get(f"refresh_token:{payload['jti']}")
            if not token_data:
                return None
            
            token_info = json.loads(token_data)
            
            # Check refresh count limit
            if token_info['refresh_count'] >= self.max_refresh_count:
                self.revoke_refresh_token(payload['jti'])
                return None
            
            # Generate new access token
            new_token_pair = self.generate_token_pair(
                payload['user_id'],
                {'refreshed_from': payload['jti']}
            )
            
            # Update refresh count
            token_info['refresh_count'] += 1
            self.redis.setex(
                f"refresh_token:{payload['jti']}",
                self.refresh_token_lifetime,
                json.dumps(token_info)
            )
            
            return new_token_pair
            
        except jwt.InvalidTokenError:
            return None
    
    def revoke_token(self, token: str):
        """Revoke a token"""
        try:
            payload = jwt.decode(token, self.secret_key, 
                               algorithms=['HS256'], 
                               options={"verify_exp": False})
            
            # Add to revocation list with expiration
            ttl = payload['exp'] - datetime.now(timezone.utc).timestamp()
            if ttl > 0:
                self.redis.setex(
                    f"revoked_token:{payload['jti']}",
                    int(ttl),
                    '1'
                )
            
            # If refresh token, also remove from Redis
            if payload.get('type') == 'refresh':
                self.redis.delete(f"refresh_token:{payload['jti']}")
                
        except jwt.InvalidTokenError:
            pass
    
    def create_api_key(self, user_id: int, name: str, 
                      scopes: List[str]) -> Tuple[str, str]:
        """Create long-lived API key"""
        # Generate API key
        key_id = secrets.token_urlsafe(16)
        key_secret = secrets.token_urlsafe(32)
        
        # Hash the secret for storage
        key_hash = hashlib.sha256(key_secret.encode()).hexdigest()
        
        # Store API key metadata
        api_key_data = {
            'user_id': user_id,
            'name': name,
            'key_hash': key_hash,
            'scopes': scopes,
            'created_at': datetime.now(timezone.utc).isoformat(),
            'last_used': None,
            'usage_count': 0
        }
        
        self.redis.hset('api_keys', key_id, json.dumps(api_key_data))
        
        # Return key ID and secret (only shown once)
        return key_id, key_secret
    
    def verify_api_key(self, key_id: str, key_secret: str) -> Optional[Dict]:
        """Verify API key"""
        api_key_data = self.redis.hget('api_keys', key_id)
        if not api_key_data:
            return None
        
        key_info = json.loads(api_key_data)
        
        # Verify key secret
        provided_hash = hashlib.sha256(key_secret.encode()).hexdigest()
        if not secrets.compare_digest(provided_hash, key_info['key_hash']):
            return None
        
        # Update usage statistics
        key_info['last_used'] = datetime.now(timezone.utc).isoformat()
        key_info['usage_count'] += 1
        self.redis.hset('api_keys', key_id, json.dumps(key_info))
        
        return {
            'user_id': key_info['user_id'],
            'scopes': key_info['scopes'],
            'key_name': key_info['name']
        }
// JavaScript - Secure Token Authentication
const jwt = require('jsonwebtoken');
const crypto = require('crypto');
const redis = require('redis');

class SecureTokenManager {
    constructor(secretKey, redisClient) {
        this.secretKey = secretKey;
        this.redis = redisClient;
        
        // Token configuration
        this.accessTokenLifetime = 15 * 60; // 15 minutes
        this.refreshTokenLifetime = 30 * 24 * 60 * 60; // 30 days
        this.maxRefreshCount = 5;
        
        // Initialize token rotation secret
        this.currentSecretVersion = 1;
        this.secrets = {
            1: secretKey,
            // Add new versions during key rotation
        };
    }
    
    async generateTokenPair(userId, additionalClaims = {}) {
        // Generate unique token IDs
        const accessTokenId = crypto.randomBytes(16).toString('base64url');
        const refreshTokenId = crypto.randomBytes(32).toString('base64url');
        
        // Create access token
        const accessPayload = {
            userId,
            type: 'access',
            jti: accessTokenId,
            version: this.currentSecretVersion,
            ...additionalClaims
        };
        
        const accessToken = jwt.sign(
            accessPayload,
            this.secrets[this.currentSecretVersion],
            {
                algorithm: 'HS256',
                expiresIn: this.accessTokenLifetime
            }
        );
        
        // Create refresh token
        const refreshPayload = {
            userId,
            type: 'refresh',
            jti: refreshTokenId,
            version: this.currentSecretVersion,
            refreshCount: 0
        };
        
        const refreshToken = jwt.sign(
            refreshPayload,
            this.secrets[this.currentSecretVersion],
            {
                algorithm: 'HS256',
                expiresIn: this.refreshTokenLifetime
            }
        );
        
        // Store refresh token metadata
        await this.redis.setex(
            `refresh_token:${refreshTokenId}`,
            this.refreshTokenLifetime,
            JSON.stringify({
                userId,
                accessTokenId,
                createdAt: new Date().toISOString(),
                refreshCount: 0
            })
        );
        
        return {
            accessToken,
            refreshToken,
            tokenType: 'Bearer',
            expiresIn: this.accessTokenLifetime
        };
    }
    
    async verifyAccessToken(token) {
        try {
            // Decode without verification first to get version
            const unverified = jwt.decode(token);
            if (!unverified || unverified.type !== 'access') {
                return null;
            }
            
            // Verify with appropriate secret version
            const secretVersion = unverified.version || 1;
            const payload = jwt.verify(
                token,
                this.secrets[secretVersion],
                { algorithms: ['HS256'] }
            );
            
            // Check if token is revoked
            const isRevoked = await this.redis.get(`revoked_token:${payload.jti}`);
            if (isRevoked) {
                return null;
            }
            
            // Check additional security constraints
            if (payload.type !== 'access') {
                return null;
            }
            
            return payload;
            
        } catch (error) {
            if (error.name === 'TokenExpiredError') {
                // Log expired token attempts for monitoring
                console.log('Expired token attempt:', error.message);
            }
            return null;
        }
    }
    
    async refreshAccessToken(refreshToken) {
        try {
            // Decode and verify refresh token
            const unverified = jwt.decode(refreshToken);
            if (!unverified || unverified.type !== 'refresh') {
                return null;
            }
            
            const payload = jwt.verify(
                refreshToken,
                this.secrets[unverified.version || 1],
                { algorithms: ['HS256'] }
            );
            
            // Get refresh token metadata
            const tokenData = await this.redis.get(`refresh_token:${payload.jti}`);
            if (!tokenData) {
                return null;
            }
            
            const tokenInfo = JSON.parse(tokenData);
            
            // Check refresh count
            if (tokenInfo.refreshCount >= this.maxRefreshCount) {
                await this.revokeRefreshToken(payload.jti);
                return null;
            }
            
            // Generate new token pair
            const newTokens = await this.generateTokenPair(
                payload.userId,
                { refreshedFrom: payload.jti }
            );
            
            // Update refresh count
            tokenInfo.refreshCount++;
            await this.redis.setex(
                `refresh_token:${payload.jti}`,
                this.refreshTokenLifetime,
                JSON.stringify(tokenInfo)
            );
            
            return newTokens;
            
        } catch (error) {
            return null;
        }
    }
    
    async revokeToken(token) {
        try {
            const payload = jwt.decode(token);
            if (!payload || !payload.jti) {
                return;
            }
            
            // Calculate TTL for revocation entry
            const now = Math.floor(Date.now() / 1000);
            const ttl = payload.exp - now;
            
            if (ttl > 0) {
                await this.redis.setex(
                    `revoked_token:${payload.jti}`,
                    ttl,
                    '1'
                );
            }
            
            // If refresh token, remove metadata
            if (payload.type === 'refresh') {
                await this.redis.del(`refresh_token:${payload.jti}`);
            }
            
        } catch (error) {
            console.error('Token revocation error:', error);
        }
    }
    
    // API Key Management
    async createApiKey(userId, name, scopes = []) {
        const keyId = crypto.randomBytes(16).toString('base64url');
        const keySecret = crypto.randomBytes(32).toString('base64url');
        
        // Create composite key
        const apiKey = `${keyId}.${keySecret}`;
        
        // Hash secret for storage
        const keyHash = crypto
            .createHash('sha256')
            .update(keySecret)
            .digest('hex');
        
        // Store API key metadata
        await this.redis.hset(
            'api_keys',
            keyId,
            JSON.stringify({
                userId,
                name,
                keyHash,
                scopes,
                createdAt: new Date().toISOString(),
                lastUsed: null,
                usageCount: 0,
                active: true
            })
        );
        
        // Return full API key (only shown once)
        return {
            apiKey,
            keyId,
            name,
            scopes
        };
    }
    
    async verifyApiKey(apiKey) {
        try {
            // Parse API key
            const [keyId, keySecret] = apiKey.split('.');
            if (!keyId || !keySecret) {
                return null;
            }
            
            // Get key metadata
            const keyData = await this.redis.hget('api_keys', keyId);
            if (!keyData) {
                return null;
            }
            
            const keyInfo = JSON.parse(keyData);
            
            // Check if key is active
            if (!keyInfo.active) {
                return null;
            }
            
            // Verify secret
            const providedHash = crypto
                .createHash('sha256')
                .update(keySecret)
                .digest('hex');
            
            if (!crypto.timingSafeEqual(
                Buffer.from(providedHash),
                Buffer.from(keyInfo.keyHash)
            )) {
                return null;
            }
            
            // Update usage statistics
            keyInfo.lastUsed = new Date().toISOString();
            keyInfo.usageCount++;
            
            await this.redis.hset(
                'api_keys',
                keyId,
                JSON.stringify(keyInfo)
            );
            
            return {
                userId: keyInfo.userId,
                scopes: keyInfo.scopes,
                keyName: keyInfo.name
            };
            
        } catch (error) {
            console.error('API key verification error:', error);
            return null;
        }
    }
}

Implementing secure authentication and authorization patterns requires careful attention to detail and a deep understanding of potential attack vectors. These patterns provide a foundation for building secure applications, but must be adapted to specific use cases and regularly updated as new threats emerge.## Secure Data Storage and Encryption

Data security extends beyond transmission to encompass how information is stored, encrypted, and managed throughout its lifecycle. Both Python and JavaScript applications must implement robust data protection mechanisms to safeguard sensitive information from unauthorized access, whether the data resides in databases, files, or memory. This chapter explores comprehensive strategies for secure data storage and encryption, providing practical implementations that protect data at rest while maintaining application performance and usability.