Authentication and Authorization for APIs
Authentication and Authorization for APIs
Robust authentication forms the first line of defense for API security. While traditional session-based authentication works for web applications, APIs require stateless mechanisms that work across different clients and platforms. OAuth 2.0 and OpenID Connect have emerged as industry standards, providing flexible frameworks for delegated authorization and authentication. However, their complexity often leads to implementation errors that create vulnerabilities.
JSON Web Tokens (JWTs) offer a popular mechanism for API authentication, encoding user identity and permissions in cryptographically signed tokens. Proper JWT implementation requires careful attention to signature verification, expiration handling, and claim validation. Common mistakes include using symmetric signing keys across multiple services, accepting tokens with "none" algorithm, or failing to validate token expiration.
# Example: Secure API authentication with JWT and refresh tokens
import jwt
import secrets
from datetime import datetime, timedelta, timezone
from functools import wraps
from flask import request, jsonify, g
import redis
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
class SecureAPIAuth:
def __init__(self, config):
self.config = config
self.redis_client = redis.Redis(
host=config['redis_host'],
port=config['redis_port'],
decode_responses=True,
ssl=True
)
self.load_keys()
def load_keys(self):
"""Load RSA keys for JWT signing"""
with open(self.config['private_key_path'], 'rb') as f:
self.private_key = serialization.load_pem_private_key(
f.read(),
password=self.config['key_password'].encode()
)
with open(self.config['public_key_path'], 'rb') as f:
self.public_key = serialization.load_pem_public_key(f.read())
def generate_tokens(self, user_id, permissions):
"""Generate access and refresh tokens"""
# Create access token with short expiration
access_payload = {
'user_id': user_id,
'permissions': permissions,
'type': 'access',
'iat': datetime.now(timezone.utc),
'exp': datetime.now(timezone.utc) + timedelta(minutes=15),
'jti': secrets.token_urlsafe(16) # Unique token ID
}
access_token = jwt.encode(
access_payload,
self.private_key,
algorithm='RS256',
headers={'kid': self.config['key_id']}
)
# Create refresh token with longer expiration
refresh_payload = {
'user_id': user_id,
'type': 'refresh',
'iat': datetime.now(timezone.utc),
'exp': datetime.now(timezone.utc) + timedelta(days=30),
'jti': secrets.token_urlsafe(16),
'family': secrets.token_urlsafe(16) # Token family for rotation
}
refresh_token = jwt.encode(
refresh_payload,
self.private_key,
algorithm='RS256'
)
# Store refresh token family in Redis
self.redis_client.setex(
f"refresh_family:{refresh_payload['family']}",
timedelta(days=30),
json.dumps({
'user_id': user_id,
'created_at': datetime.now(timezone.utc).isoformat(),
'token_jti': refresh_payload['jti']
})
)
return {
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer',
'expires_in': 900 # 15 minutes
}
def require_auth(self, required_permissions=None):
"""Decorator for protecting API endpoints"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Missing or invalid authorization header'}), 401
token = auth_header.split(' ')[1]
try:
# Decode and verify token
payload = jwt.decode(
token,
self.public_key,
algorithms=['RS256'],
options={'verify_exp': True}
)
# Verify token type
if payload.get('type') != 'access':
raise jwt.InvalidTokenError('Invalid token type')
# Check if token is revoked
if self.is_token_revoked(payload['jti']):
raise jwt.InvalidTokenError('Token has been revoked')
# Verify permissions
if required_permissions:
user_permissions = set(payload.get('permissions', []))
if not user_permissions.issuperset(set(required_permissions)):
return jsonify({'error': 'Insufficient permissions'}), 403
# Add user context to request
g.user_id = payload['user_id']
g.permissions = payload['permissions']
g.token_jti = payload['jti']
return f(*args, **kwargs)
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token has expired'}), 401
except jwt.InvalidTokenError as e:
return jsonify({'error': f'Invalid token: {str(e)}'}), 401
except Exception as e:
# Log unexpected errors without exposing details
self.logger.error(f"Authentication error: {str(e)}")
return jsonify({'error': 'Authentication failed'}), 401
return decorated_function
return decorator
def refresh_access_token(self, refresh_token):
"""Implement refresh token rotation"""
try:
payload = jwt.decode(
refresh_token,
self.public_key,
algorithms=['RS256'],
options={'verify_exp': True}
)
if payload.get('type') != 'refresh':
raise ValueError('Invalid token type')
# Check token family
family_key = f"refresh_family:{payload['family']}"
family_data = self.redis_client.get(family_key)
if not family_data:
# Family doesn't exist - possible token reuse attack
self.revoke_user_tokens(payload['user_id'])
raise ValueError('Invalid refresh token family')
family_info = json.loads(family_data)
# Verify this is the latest token in the family
if family_info['token_jti'] != payload['jti']:
# Old token reuse detected - revoke entire family
self.redis_client.delete(family_key)
self.revoke_user_tokens(payload['user_id'])
raise ValueError('Refresh token reuse detected')
# Generate new token pair
new_tokens = self.generate_tokens(
payload['user_id'],
self.get_user_permissions(payload['user_id'])
)
# Update family with new refresh token
new_refresh_payload = jwt.decode(
new_tokens['refresh_token'],
self.public_key,
algorithms=['RS256']
)
family_info['token_jti'] = new_refresh_payload['jti']
self.redis_client.setex(
family_key,
timedelta(days=30),
json.dumps(family_info)
)
# Revoke old refresh token
self.revoke_token(payload['jti'])
return new_tokens
except Exception as e:
self.logger.error(f"Refresh token error: {str(e)}")
raise
def revoke_token(self, jti):
"""Revoke a specific token"""
# Store revoked token JTI until expiration
self.redis_client.setex(
f"revoked_token:{jti}",
timedelta(hours=1), # Keep longer than token lifetime
'1'
)
def is_token_revoked(self, jti):
"""Check if token is revoked"""
return self.redis_client.exists(f"revoked_token:{jti}")