Implementing the Authorization Code Flow

Implementing the Authorization Code Flow

The authorization code flow provides the highest security for web applications by keeping tokens away from the browser. This flow involves redirecting users to the authorization server, obtaining an authorization code, and exchanging it for tokens server-side. Proper implementation requires careful attention to security details.

# Python Flask implementation of OAuth 2.0 Authorization Server
from flask import Flask, request, redirect, jsonify, session, render_template
import secrets
import hashlib
import time
from datetime import datetime, timedelta
from urllib.parse import urlencode, parse_qs
import jwt
import redis

app = Flask(__name__)
app.secret_key = secrets.token_bytes(32)

# Redis for storing authorization codes and tokens
redis_client = redis.Redis(decode_responses=True)

class OAuth2AuthorizationServer:
    def __init__(self):
        self.signing_key = "your-secret-signing-key"
        self.clients = {}  # In production, use database
        
    def register_client(self, client_id, client_secret, redirect_uris, scopes):
        """Register an OAuth client"""
        self.clients[client_id] = {
            'client_secret_hash': hashlib.sha256(client_secret.encode()).hexdigest(),
            'redirect_uris': redirect_uris,
            'allowed_scopes': scopes
        }
    
    def generate_authorization_code(self, client_id, user_id, redirect_uri, 
                                  scope, code_challenge=None):
        """Generate authorization code with PKCE support"""
        code = secrets.token_urlsafe(32)
        code_data = {
            'client_id': client_id,
            'user_id': user_id,
            'redirect_uri': redirect_uri,
            'scope': scope,
            'expires_at': time.time() + 600,  # 10 minutes
            'code_challenge': code_challenge
        }
        
        # Store in Redis with expiration
        redis_client.setex(
            f"auth_code:{code}",
            600,
            json.dumps(code_data)
        )
        
        return code
    
    def validate_authorization_request(self, client_id, redirect_uri, 
                                     response_type, scope, state,
                                     code_challenge=None, 
                                     code_challenge_method=None):
        """Validate authorization request parameters"""
        # Check client exists
        if client_id not in self.clients:
            return {'error': 'invalid_client'}
        
        client = self.clients[client_id]
        
        # Validate redirect URI
        if redirect_uri not in client['redirect_uris']:
            return {'error': 'invalid_redirect_uri'}
        
        # Validate response type
        if response_type != 'code':
            return {
                'error': 'unsupported_response_type',
                'redirect_uri': redirect_uri,
                'state': state
            }
        
        # Validate scope
        requested_scopes = scope.split()
        for s in requested_scopes:
            if s not in client['allowed_scopes']:
                return {
                    'error': 'invalid_scope',
                    'redirect_uri': redirect_uri,
                    'state': state
                }
        
        # Validate PKCE parameters
        if code_challenge:
            if code_challenge_method not in ['S256', 'plain']:
                return {
                    'error': 'invalid_request',
                    'error_description': 'Unsupported code_challenge_method',
                    'redirect_uri': redirect_uri,
                    'state': state
                }
        
        return {'valid': True}

auth_server = OAuth2AuthorizationServer()

@app.route('/oauth/authorize', methods=['GET'])
def authorize():
    """OAuth 2.0 authorization endpoint"""
    # Extract parameters
    client_id = request.args.get('client_id')
    redirect_uri = request.args.get('redirect_uri')
    response_type = request.args.get('response_type')
    scope = request.args.get('scope', '')
    state = request.args.get('state')
    code_challenge = request.args.get('code_challenge')
    code_challenge_method = request.args.get('code_challenge_method', 'plain')
    
    # Validate request
    validation = auth_server.validate_authorization_request(
        client_id, redirect_uri, response_type, scope, state,
        code_challenge, code_challenge_method
    )
    
    if 'error' in validation:
        # Handle errors
        if 'redirect_uri' in validation:
            error_params = {
                'error': validation['error'],
                'state': validation.get('state')
            }
            return redirect(f"{validation['redirect_uri']}?{urlencode(error_params)}")
        else:
            return jsonify({'error': validation['error']}), 400
    
    # Store request in session for consent page
    session['oauth_request'] = {
        'client_id': client_id,
        'redirect_uri': redirect_uri,
        'scope': scope,
        'state': state,
        'code_challenge': code_challenge,
        'code_challenge_method': code_challenge_method
    }
    
    # Show consent page
    return render_template('consent.html',
                         client_id=client_id,
                         scope=scope.split())

@app.route('/oauth/authorize', methods=['POST'])
def authorize_consent():
    """Handle user consent"""
    oauth_request = session.get('oauth_request')
    if not oauth_request:
        return jsonify({'error': 'invalid_request'}), 400
    
    if request.form.get('consent') != 'allow':
        # User denied access
        error_params = {
            'error': 'access_denied',
            'state': oauth_request.get('state')
        }
        return redirect(f"{oauth_request['redirect_uri']}?{urlencode(error_params)}")
    
    # Generate authorization code
    user_id = session['user_id']  # From your authentication system
    code = auth_server.generate_authorization_code(
        oauth_request['client_id'],
        user_id,
        oauth_request['redirect_uri'],
        oauth_request['scope'],
        oauth_request.get('code_challenge')
    )
    
    # Redirect with code
    params = {
        'code': code,
        'state': oauth_request.get('state')
    }
    
    return redirect(f"{oauth_request['redirect_uri']}?{urlencode(params)}")

@app.route('/oauth/token', methods=['POST'])
def token():
    """OAuth 2.0 token endpoint"""
    grant_type = request.form.get('grant_type')
    
    if grant_type == 'authorization_code':
        return handle_authorization_code_grant()
    elif grant_type == 'refresh_token':
        return handle_refresh_token_grant()
    elif grant_type == 'client_credentials':
        return handle_client_credentials_grant()
    else:
        return jsonify({'error': 'unsupported_grant_type'}), 400

def handle_authorization_code_grant():
    """Exchange authorization code for tokens"""
    code = request.form.get('code')
    client_id = request.form.get('client_id')
    client_secret = request.form.get('client_secret')
    redirect_uri = request.form.get('redirect_uri')
    code_verifier = request.form.get('code_verifier')
    
    # Validate client credentials
    if not validate_client_credentials(client_id, client_secret):
        return jsonify({'error': 'invalid_client'}), 401
    
    # Retrieve and validate authorization code
    code_data = redis_client.get(f"auth_code:{code}")
    if not code_data:
        return jsonify({'error': 'invalid_grant'}), 400
    
    code_data = json.loads(code_data)
    
    # Validate code hasn't expired
    if time.time() > code_data['expires_at']:
        redis_client.delete(f"auth_code:{code}")
        return jsonify({'error': 'invalid_grant'}), 400
    
    # Validate client_id matches
    if code_data['client_id'] != client_id:
        return jsonify({'error': 'invalid_grant'}), 400
    
    # Validate redirect_uri matches
    if code_data['redirect_uri'] != redirect_uri:
        return jsonify({'error': 'invalid_grant'}), 400
    
    # Validate PKCE if used
    if code_data.get('code_challenge'):
        if not code_verifier:
            return jsonify({'error': 'invalid_request'}), 400
        
        # Verify code challenge
        if code_data.get('code_challenge_method') == 'S256':
            challenge = base64.urlsafe_b64encode(
                hashlib.sha256(code_verifier.encode()).digest()
            ).decode().rstrip('=')
        else:
            challenge = code_verifier
        
        if challenge != code_data['code_challenge']:
            return jsonify({'error': 'invalid_grant'}), 400
    
    # Delete used authorization code
    redis_client.delete(f"auth_code:{code}")
    
    # Generate tokens
    access_token = generate_access_token(
        code_data['user_id'],
        code_data['client_id'],
        code_data['scope']
    )
    
    refresh_token = generate_refresh_token(
        code_data['user_id'],
        code_data['client_id'],
        code_data['scope']
    )
    
    return jsonify({
        'access_token': access_token,
        'token_type': 'Bearer',
        'expires_in': 3600,
        'refresh_token': refresh_token,
        'scope': code_data['scope']
    })

def generate_access_token(user_id, client_id, scope):
    """Generate JWT access token"""
    payload = {
        'sub': user_id,
        'client_id': client_id,
        'scope': scope,
        'iat': datetime.utcnow(),
        'exp': datetime.utcnow() + timedelta(hours=1),
        'token_type': 'access'
    }
    
    return jwt.encode(payload, auth_server.signing_key, algorithm='HS256')

def generate_refresh_token(user_id, client_id, scope):
    """Generate refresh token"""
    token = secrets.token_urlsafe(32)
    
    # Store refresh token data
    redis_client.setex(
        f"refresh_token:{token}",
        86400 * 30,  # 30 days
        json.dumps({
            'user_id': user_id,
            'client_id': client_id,
            'scope': scope,
            'created_at': time.time()
        })
    )
    
    return token