Authentication and Authorization for APIs

Authentication and Authorization for APIs

Robust authentication forms the first line of defense for API security. While traditional session-based authentication works for web applications, APIs require stateless mechanisms that work across different clients and platforms. OAuth 2.0 and OpenID Connect have emerged as industry standards, providing flexible frameworks for delegated authorization and authentication. However, their complexity often leads to implementation errors that create vulnerabilities.

JSON Web Tokens (JWTs) offer a popular mechanism for API authentication, encoding user identity and permissions in cryptographically signed tokens. Proper JWT implementation requires careful attention to signature verification, expiration handling, and claim validation. Common mistakes include using symmetric signing keys across multiple services, accepting tokens with "none" algorithm, or failing to validate token expiration.

# Example: Secure API authentication with JWT and refresh tokens
import jwt
import secrets
from datetime import datetime, timedelta, timezone
from functools import wraps
from flask import request, jsonify, g
import redis
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa

class SecureAPIAuth:
    def __init__(self, config):
        self.config = config
        self.redis_client = redis.Redis(
            host=config['redis_host'],
            port=config['redis_port'],
            decode_responses=True,
            ssl=True
        )
        self.load_keys()
        
    def load_keys(self):
        """Load RSA keys for JWT signing"""
        with open(self.config['private_key_path'], 'rb') as f:
            self.private_key = serialization.load_pem_private_key(
                f.read(),
                password=self.config['key_password'].encode()
            )
        
        with open(self.config['public_key_path'], 'rb') as f:
            self.public_key = serialization.load_pem_public_key(f.read())
    
    def generate_tokens(self, user_id, permissions):
        """Generate access and refresh tokens"""
        # Create access token with short expiration
        access_payload = {
            'user_id': user_id,
            'permissions': permissions,
            'type': 'access',
            'iat': datetime.now(timezone.utc),
            'exp': datetime.now(timezone.utc) + timedelta(minutes=15),
            'jti': secrets.token_urlsafe(16)  # Unique token ID
        }
        
        access_token = jwt.encode(
            access_payload,
            self.private_key,
            algorithm='RS256',
            headers={'kid': self.config['key_id']}
        )
        
        # Create refresh token with longer expiration
        refresh_payload = {
            'user_id': user_id,
            'type': 'refresh',
            'iat': datetime.now(timezone.utc),
            'exp': datetime.now(timezone.utc) + timedelta(days=30),
            'jti': secrets.token_urlsafe(16),
            'family': secrets.token_urlsafe(16)  # Token family for rotation
        }
        
        refresh_token = jwt.encode(
            refresh_payload,
            self.private_key,
            algorithm='RS256'
        )
        
        # Store refresh token family in Redis
        self.redis_client.setex(
            f"refresh_family:{refresh_payload['family']}",
            timedelta(days=30),
            json.dumps({
                'user_id': user_id,
                'created_at': datetime.now(timezone.utc).isoformat(),
                'token_jti': refresh_payload['jti']
            })
        )
        
        return {
            'access_token': access_token,
            'refresh_token': refresh_token,
            'token_type': 'Bearer',
            'expires_in': 900  # 15 minutes
        }
    
    def require_auth(self, required_permissions=None):
        """Decorator for protecting API endpoints"""
        def decorator(f):
            @wraps(f)
            def decorated_function(*args, **kwargs):
                auth_header = request.headers.get('Authorization')
                
                if not auth_header or not auth_header.startswith('Bearer '):
                    return jsonify({'error': 'Missing or invalid authorization header'}), 401
                
                token = auth_header.split(' ')[1]
                
                try:
                    # Decode and verify token
                    payload = jwt.decode(
                        token,
                        self.public_key,
                        algorithms=['RS256'],
                        options={'verify_exp': True}
                    )
                    
                    # Verify token type
                    if payload.get('type') != 'access':
                        raise jwt.InvalidTokenError('Invalid token type')
                    
                    # Check if token is revoked
                    if self.is_token_revoked(payload['jti']):
                        raise jwt.InvalidTokenError('Token has been revoked')
                    
                    # Verify permissions
                    if required_permissions:
                        user_permissions = set(payload.get('permissions', []))
                        if not user_permissions.issuperset(set(required_permissions)):
                            return jsonify({'error': 'Insufficient permissions'}), 403
                    
                    # Add user context to request
                    g.user_id = payload['user_id']
                    g.permissions = payload['permissions']
                    g.token_jti = payload['jti']
                    
                    return f(*args, **kwargs)
                    
                except jwt.ExpiredSignatureError:
                    return jsonify({'error': 'Token has expired'}), 401
                except jwt.InvalidTokenError as e:
                    return jsonify({'error': f'Invalid token: {str(e)}'}), 401
                except Exception as e:
                    # Log unexpected errors without exposing details
                    self.logger.error(f"Authentication error: {str(e)}")
                    return jsonify({'error': 'Authentication failed'}), 401
            
            return decorated_function
        return decorator
    
    def refresh_access_token(self, refresh_token):
        """Implement refresh token rotation"""
        try:
            payload = jwt.decode(
                refresh_token,
                self.public_key,
                algorithms=['RS256'],
                options={'verify_exp': True}
            )
            
            if payload.get('type') != 'refresh':
                raise ValueError('Invalid token type')
            
            # Check token family
            family_key = f"refresh_family:{payload['family']}"
            family_data = self.redis_client.get(family_key)
            
            if not family_data:
                # Family doesn't exist - possible token reuse attack
                self.revoke_user_tokens(payload['user_id'])
                raise ValueError('Invalid refresh token family')
            
            family_info = json.loads(family_data)
            
            # Verify this is the latest token in the family
            if family_info['token_jti'] != payload['jti']:
                # Old token reuse detected - revoke entire family
                self.redis_client.delete(family_key)
                self.revoke_user_tokens(payload['user_id'])
                raise ValueError('Refresh token reuse detected')
            
            # Generate new token pair
            new_tokens = self.generate_tokens(
                payload['user_id'],
                self.get_user_permissions(payload['user_id'])
            )
            
            # Update family with new refresh token
            new_refresh_payload = jwt.decode(
                new_tokens['refresh_token'],
                self.public_key,
                algorithms=['RS256']
            )
            
            family_info['token_jti'] = new_refresh_payload['jti']
            self.redis_client.setex(
                family_key,
                timedelta(days=30),
                json.dumps(family_info)
            )
            
            # Revoke old refresh token
            self.revoke_token(payload['jti'])
            
            return new_tokens
            
        except Exception as e:
            self.logger.error(f"Refresh token error: {str(e)}")
            raise
    
    def revoke_token(self, jti):
        """Revoke a specific token"""
        # Store revoked token JTI until expiration
        self.redis_client.setex(
            f"revoked_token:{jti}",
            timedelta(hours=1),  # Keep longer than token lifetime
            '1'
        )
    
    def is_token_revoked(self, jti):
        """Check if token is revoked"""
        return self.redis_client.exists(f"revoked_token:{jti}")