Token-Based Authentication
Token-Based Authentication
Modern applications increasingly rely on token-based authentication, particularly for API access and single-page applications. JSON Web Tokens (JWT) and opaque tokens each have their place, but must be implemented securely to prevent token theft, replay attacks, and other vulnerabilities.
# Python - Secure Token Authentication
import jwt
import secrets
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional, Tuple
import redis
from cryptography.fernet import Fernet
import base64
import hashlib
class SecureTokenManager:
def __init__(self, secret_key: str, redis_client: redis.Redis):
self.secret_key = secret_key
self.redis = redis_client
self.fernet = Fernet(base64.urlsafe_b64encode(hashlib.sha256(secret_key.encode()).digest()))
# Token configuration
self.access_token_lifetime = timedelta(minutes=15)
self.refresh_token_lifetime = timedelta(days=30)
self.max_refresh_count = 5
def generate_token_pair(self, user_id: int,
additional_claims: Dict = None) -> Dict[str, str]:
"""Generate access and refresh token pair"""
# Generate unique token ID for tracking
token_id = secrets.token_urlsafe(16)
# Access token with short lifetime
access_payload = {
'user_id': user_id,
'type': 'access',
'jti': token_id,
'iat': datetime.now(timezone.utc),
'exp': datetime.now(timezone.utc) + self.access_token_lifetime,
**(additional_claims or {})
}
# Refresh token with longer lifetime
refresh_token_id = secrets.token_urlsafe(32)
refresh_payload = {
'user_id': user_id,
'type': 'refresh',
'jti': refresh_token_id,
'iat': datetime.now(timezone.utc),
'exp': datetime.now(timezone.utc) + self.refresh_token_lifetime,
'refresh_count': 0
}
# Store refresh token in Redis
self.redis.setex(
f"refresh_token:{refresh_token_id}",
self.refresh_token_lifetime,
json.dumps({
'user_id': user_id,
'access_token_id': token_id,
'created_at': datetime.now(timezone.utc).isoformat(),
'refresh_count': 0
})
)
access_token = jwt.encode(access_payload, self.secret_key, algorithm='HS256')
refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm='HS256')
return {
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer',
'expires_in': int(self.access_token_lifetime.total_seconds())
}
def verify_access_token(self, token: str) -> Optional[Dict]:
"""Verify and decode access token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
# Verify token type
if payload.get('type') != 'access':
return None
# Check if token is revoked
if self.redis.exists(f"revoked_token:{payload['jti']}"):
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def refresh_access_token(self, refresh_token: str) -> Optional[Dict[str, str]]:
"""Use refresh token to get new access token"""
try:
payload = jwt.decode(refresh_token, self.secret_key, algorithms=['HS256'])
# Verify token type
if payload.get('type') != 'refresh':
return None
# Get refresh token data from Redis
token_data = self.redis.get(f"refresh_token:{payload['jti']}")
if not token_data:
return None
token_info = json.loads(token_data)
# Check refresh count limit
if token_info['refresh_count'] >= self.max_refresh_count:
self.revoke_refresh_token(payload['jti'])
return None
# Generate new access token
new_token_pair = self.generate_token_pair(
payload['user_id'],
{'refreshed_from': payload['jti']}
)
# Update refresh count
token_info['refresh_count'] += 1
self.redis.setex(
f"refresh_token:{payload['jti']}",
self.refresh_token_lifetime,
json.dumps(token_info)
)
return new_token_pair
except jwt.InvalidTokenError:
return None
def revoke_token(self, token: str):
"""Revoke a token"""
try:
payload = jwt.decode(token, self.secret_key,
algorithms=['HS256'],
options={"verify_exp": False})
# Add to revocation list with expiration
ttl = payload['exp'] - datetime.now(timezone.utc).timestamp()
if ttl > 0:
self.redis.setex(
f"revoked_token:{payload['jti']}",
int(ttl),
'1'
)
# If refresh token, also remove from Redis
if payload.get('type') == 'refresh':
self.redis.delete(f"refresh_token:{payload['jti']}")
except jwt.InvalidTokenError:
pass
def create_api_key(self, user_id: int, name: str,
scopes: List[str]) -> Tuple[str, str]:
"""Create long-lived API key"""
# Generate API key
key_id = secrets.token_urlsafe(16)
key_secret = secrets.token_urlsafe(32)
# Hash the secret for storage
key_hash = hashlib.sha256(key_secret.encode()).hexdigest()
# Store API key metadata
api_key_data = {
'user_id': user_id,
'name': name,
'key_hash': key_hash,
'scopes': scopes,
'created_at': datetime.now(timezone.utc).isoformat(),
'last_used': None,
'usage_count': 0
}
self.redis.hset('api_keys', key_id, json.dumps(api_key_data))
# Return key ID and secret (only shown once)
return key_id, key_secret
def verify_api_key(self, key_id: str, key_secret: str) -> Optional[Dict]:
"""Verify API key"""
api_key_data = self.redis.hget('api_keys', key_id)
if not api_key_data:
return None
key_info = json.loads(api_key_data)
# Verify key secret
provided_hash = hashlib.sha256(key_secret.encode()).hexdigest()
if not secrets.compare_digest(provided_hash, key_info['key_hash']):
return None
# Update usage statistics
key_info['last_used'] = datetime.now(timezone.utc).isoformat()
key_info['usage_count'] += 1
self.redis.hset('api_keys', key_id, json.dumps(key_info))
return {
'user_id': key_info['user_id'],
'scopes': key_info['scopes'],
'key_name': key_info['name']
}
// JavaScript - Secure Token Authentication
const jwt = require('jsonwebtoken');
const crypto = require('crypto');
const redis = require('redis');
class SecureTokenManager {
constructor(secretKey, redisClient) {
this.secretKey = secretKey;
this.redis = redisClient;
// Token configuration
this.accessTokenLifetime = 15 * 60; // 15 minutes
this.refreshTokenLifetime = 30 * 24 * 60 * 60; // 30 days
this.maxRefreshCount = 5;
// Initialize token rotation secret
this.currentSecretVersion = 1;
this.secrets = {
1: secretKey,
// Add new versions during key rotation
};
}
async generateTokenPair(userId, additionalClaims = {}) {
// Generate unique token IDs
const accessTokenId = crypto.randomBytes(16).toString('base64url');
const refreshTokenId = crypto.randomBytes(32).toString('base64url');
// Create access token
const accessPayload = {
userId,
type: 'access',
jti: accessTokenId,
version: this.currentSecretVersion,
...additionalClaims
};
const accessToken = jwt.sign(
accessPayload,
this.secrets[this.currentSecretVersion],
{
algorithm: 'HS256',
expiresIn: this.accessTokenLifetime
}
);
// Create refresh token
const refreshPayload = {
userId,
type: 'refresh',
jti: refreshTokenId,
version: this.currentSecretVersion,
refreshCount: 0
};
const refreshToken = jwt.sign(
refreshPayload,
this.secrets[this.currentSecretVersion],
{
algorithm: 'HS256',
expiresIn: this.refreshTokenLifetime
}
);
// Store refresh token metadata
await this.redis.setex(
`refresh_token:${refreshTokenId}`,
this.refreshTokenLifetime,
JSON.stringify({
userId,
accessTokenId,
createdAt: new Date().toISOString(),
refreshCount: 0
})
);
return {
accessToken,
refreshToken,
tokenType: 'Bearer',
expiresIn: this.accessTokenLifetime
};
}
async verifyAccessToken(token) {
try {
// Decode without verification first to get version
const unverified = jwt.decode(token);
if (!unverified || unverified.type !== 'access') {
return null;
}
// Verify with appropriate secret version
const secretVersion = unverified.version || 1;
const payload = jwt.verify(
token,
this.secrets[secretVersion],
{ algorithms: ['HS256'] }
);
// Check if token is revoked
const isRevoked = await this.redis.get(`revoked_token:${payload.jti}`);
if (isRevoked) {
return null;
}
// Check additional security constraints
if (payload.type !== 'access') {
return null;
}
return payload;
} catch (error) {
if (error.name === 'TokenExpiredError') {
// Log expired token attempts for monitoring
console.log('Expired token attempt:', error.message);
}
return null;
}
}
async refreshAccessToken(refreshToken) {
try {
// Decode and verify refresh token
const unverified = jwt.decode(refreshToken);
if (!unverified || unverified.type !== 'refresh') {
return null;
}
const payload = jwt.verify(
refreshToken,
this.secrets[unverified.version || 1],
{ algorithms: ['HS256'] }
);
// Get refresh token metadata
const tokenData = await this.redis.get(`refresh_token:${payload.jti}`);
if (!tokenData) {
return null;
}
const tokenInfo = JSON.parse(tokenData);
// Check refresh count
if (tokenInfo.refreshCount >= this.maxRefreshCount) {
await this.revokeRefreshToken(payload.jti);
return null;
}
// Generate new token pair
const newTokens = await this.generateTokenPair(
payload.userId,
{ refreshedFrom: payload.jti }
);
// Update refresh count
tokenInfo.refreshCount++;
await this.redis.setex(
`refresh_token:${payload.jti}`,
this.refreshTokenLifetime,
JSON.stringify(tokenInfo)
);
return newTokens;
} catch (error) {
return null;
}
}
async revokeToken(token) {
try {
const payload = jwt.decode(token);
if (!payload || !payload.jti) {
return;
}
// Calculate TTL for revocation entry
const now = Math.floor(Date.now() / 1000);
const ttl = payload.exp - now;
if (ttl > 0) {
await this.redis.setex(
`revoked_token:${payload.jti}`,
ttl,
'1'
);
}
// If refresh token, remove metadata
if (payload.type === 'refresh') {
await this.redis.del(`refresh_token:${payload.jti}`);
}
} catch (error) {
console.error('Token revocation error:', error);
}
}
// API Key Management
async createApiKey(userId, name, scopes = []) {
const keyId = crypto.randomBytes(16).toString('base64url');
const keySecret = crypto.randomBytes(32).toString('base64url');
// Create composite key
const apiKey = `${keyId}.${keySecret}`;
// Hash secret for storage
const keyHash = crypto
.createHash('sha256')
.update(keySecret)
.digest('hex');
// Store API key metadata
await this.redis.hset(
'api_keys',
keyId,
JSON.stringify({
userId,
name,
keyHash,
scopes,
createdAt: new Date().toISOString(),
lastUsed: null,
usageCount: 0,
active: true
})
);
// Return full API key (only shown once)
return {
apiKey,
keyId,
name,
scopes
};
}
async verifyApiKey(apiKey) {
try {
// Parse API key
const [keyId, keySecret] = apiKey.split('.');
if (!keyId || !keySecret) {
return null;
}
// Get key metadata
const keyData = await this.redis.hget('api_keys', keyId);
if (!keyData) {
return null;
}
const keyInfo = JSON.parse(keyData);
// Check if key is active
if (!keyInfo.active) {
return null;
}
// Verify secret
const providedHash = crypto
.createHash('sha256')
.update(keySecret)
.digest('hex');
if (!crypto.timingSafeEqual(
Buffer.from(providedHash),
Buffer.from(keyInfo.keyHash)
)) {
return null;
}
// Update usage statistics
keyInfo.lastUsed = new Date().toISOString();
keyInfo.usageCount++;
await this.redis.hset(
'api_keys',
keyId,
JSON.stringify(keyInfo)
);
return {
userId: keyInfo.userId,
scopes: keyInfo.scopes,
keyName: keyInfo.name
};
} catch (error) {
console.error('API key verification error:', error);
return null;
}
}
}
Implementing secure authentication and authorization patterns requires careful attention to detail and a deep understanding of potential attack vectors. These patterns provide a foundation for building secure applications, but must be adapted to specific use cases and regularly updated as new threats emerge.## Secure Data Storage and Encryption
Data security extends beyond transmission to encompass how information is stored, encrypted, and managed throughout its lifecycle. Both Python and JavaScript applications must implement robust data protection mechanisms to safeguard sensitive information from unauthorized access, whether the data resides in databases, files, or memory. This chapter explores comprehensive strategies for secure data storage and encryption, providing practical implementations that protect data at rest while maintaining application performance and usability.