Implementing the Authorization Code Flow
Implementing the Authorization Code Flow
The authorization code flow provides the highest security for web applications by keeping tokens away from the browser. This flow involves redirecting users to the authorization server, obtaining an authorization code, and exchanging it for tokens server-side. Proper implementation requires careful attention to security details.
# Python Flask implementation of OAuth 2.0 Authorization Server
from flask import Flask, request, redirect, jsonify, session, render_template
import secrets
import hashlib
import time
from datetime import datetime, timedelta
from urllib.parse import urlencode, parse_qs
import jwt
import redis
app = Flask(__name__)
app.secret_key = secrets.token_bytes(32)
# Redis for storing authorization codes and tokens
redis_client = redis.Redis(decode_responses=True)
class OAuth2AuthorizationServer:
def __init__(self):
self.signing_key = "your-secret-signing-key"
self.clients = {} # In production, use database
def register_client(self, client_id, client_secret, redirect_uris, scopes):
"""Register an OAuth client"""
self.clients[client_id] = {
'client_secret_hash': hashlib.sha256(client_secret.encode()).hexdigest(),
'redirect_uris': redirect_uris,
'allowed_scopes': scopes
}
def generate_authorization_code(self, client_id, user_id, redirect_uri,
scope, code_challenge=None):
"""Generate authorization code with PKCE support"""
code = secrets.token_urlsafe(32)
code_data = {
'client_id': client_id,
'user_id': user_id,
'redirect_uri': redirect_uri,
'scope': scope,
'expires_at': time.time() + 600, # 10 minutes
'code_challenge': code_challenge
}
# Store in Redis with expiration
redis_client.setex(
f"auth_code:{code}",
600,
json.dumps(code_data)
)
return code
def validate_authorization_request(self, client_id, redirect_uri,
response_type, scope, state,
code_challenge=None,
code_challenge_method=None):
"""Validate authorization request parameters"""
# Check client exists
if client_id not in self.clients:
return {'error': 'invalid_client'}
client = self.clients[client_id]
# Validate redirect URI
if redirect_uri not in client['redirect_uris']:
return {'error': 'invalid_redirect_uri'}
# Validate response type
if response_type != 'code':
return {
'error': 'unsupported_response_type',
'redirect_uri': redirect_uri,
'state': state
}
# Validate scope
requested_scopes = scope.split()
for s in requested_scopes:
if s not in client['allowed_scopes']:
return {
'error': 'invalid_scope',
'redirect_uri': redirect_uri,
'state': state
}
# Validate PKCE parameters
if code_challenge:
if code_challenge_method not in ['S256', 'plain']:
return {
'error': 'invalid_request',
'error_description': 'Unsupported code_challenge_method',
'redirect_uri': redirect_uri,
'state': state
}
return {'valid': True}
auth_server = OAuth2AuthorizationServer()
@app.route('/oauth/authorize', methods=['GET'])
def authorize():
"""OAuth 2.0 authorization endpoint"""
# Extract parameters
client_id = request.args.get('client_id')
redirect_uri = request.args.get('redirect_uri')
response_type = request.args.get('response_type')
scope = request.args.get('scope', '')
state = request.args.get('state')
code_challenge = request.args.get('code_challenge')
code_challenge_method = request.args.get('code_challenge_method', 'plain')
# Validate request
validation = auth_server.validate_authorization_request(
client_id, redirect_uri, response_type, scope, state,
code_challenge, code_challenge_method
)
if 'error' in validation:
# Handle errors
if 'redirect_uri' in validation:
error_params = {
'error': validation['error'],
'state': validation.get('state')
}
return redirect(f"{validation['redirect_uri']}?{urlencode(error_params)}")
else:
return jsonify({'error': validation['error']}), 400
# Store request in session for consent page
session['oauth_request'] = {
'client_id': client_id,
'redirect_uri': redirect_uri,
'scope': scope,
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': code_challenge_method
}
# Show consent page
return render_template('consent.html',
client_id=client_id,
scope=scope.split())
@app.route('/oauth/authorize', methods=['POST'])
def authorize_consent():
"""Handle user consent"""
oauth_request = session.get('oauth_request')
if not oauth_request:
return jsonify({'error': 'invalid_request'}), 400
if request.form.get('consent') != 'allow':
# User denied access
error_params = {
'error': 'access_denied',
'state': oauth_request.get('state')
}
return redirect(f"{oauth_request['redirect_uri']}?{urlencode(error_params)}")
# Generate authorization code
user_id = session['user_id'] # From your authentication system
code = auth_server.generate_authorization_code(
oauth_request['client_id'],
user_id,
oauth_request['redirect_uri'],
oauth_request['scope'],
oauth_request.get('code_challenge')
)
# Redirect with code
params = {
'code': code,
'state': oauth_request.get('state')
}
return redirect(f"{oauth_request['redirect_uri']}?{urlencode(params)}")
@app.route('/oauth/token', methods=['POST'])
def token():
"""OAuth 2.0 token endpoint"""
grant_type = request.form.get('grant_type')
if grant_type == 'authorization_code':
return handle_authorization_code_grant()
elif grant_type == 'refresh_token':
return handle_refresh_token_grant()
elif grant_type == 'client_credentials':
return handle_client_credentials_grant()
else:
return jsonify({'error': 'unsupported_grant_type'}), 400
def handle_authorization_code_grant():
"""Exchange authorization code for tokens"""
code = request.form.get('code')
client_id = request.form.get('client_id')
client_secret = request.form.get('client_secret')
redirect_uri = request.form.get('redirect_uri')
code_verifier = request.form.get('code_verifier')
# Validate client credentials
if not validate_client_credentials(client_id, client_secret):
return jsonify({'error': 'invalid_client'}), 401
# Retrieve and validate authorization code
code_data = redis_client.get(f"auth_code:{code}")
if not code_data:
return jsonify({'error': 'invalid_grant'}), 400
code_data = json.loads(code_data)
# Validate code hasn't expired
if time.time() > code_data['expires_at']:
redis_client.delete(f"auth_code:{code}")
return jsonify({'error': 'invalid_grant'}), 400
# Validate client_id matches
if code_data['client_id'] != client_id:
return jsonify({'error': 'invalid_grant'}), 400
# Validate redirect_uri matches
if code_data['redirect_uri'] != redirect_uri:
return jsonify({'error': 'invalid_grant'}), 400
# Validate PKCE if used
if code_data.get('code_challenge'):
if not code_verifier:
return jsonify({'error': 'invalid_request'}), 400
# Verify code challenge
if code_data.get('code_challenge_method') == 'S256':
challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).decode().rstrip('=')
else:
challenge = code_verifier
if challenge != code_data['code_challenge']:
return jsonify({'error': 'invalid_grant'}), 400
# Delete used authorization code
redis_client.delete(f"auth_code:{code}")
# Generate tokens
access_token = generate_access_token(
code_data['user_id'],
code_data['client_id'],
code_data['scope']
)
refresh_token = generate_refresh_token(
code_data['user_id'],
code_data['client_id'],
code_data['scope']
)
return jsonify({
'access_token': access_token,
'token_type': 'Bearer',
'expires_in': 3600,
'refresh_token': refresh_token,
'scope': code_data['scope']
})
def generate_access_token(user_id, client_id, scope):
"""Generate JWT access token"""
payload = {
'sub': user_id,
'client_id': client_id,
'scope': scope,
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(hours=1),
'token_type': 'access'
}
return jwt.encode(payload, auth_server.signing_key, algorithm='HS256')
def generate_refresh_token(user_id, client_id, scope):
"""Generate refresh token"""
token = secrets.token_urlsafe(32)
# Store refresh token data
redis_client.setex(
f"refresh_token:{token}",
86400 * 30, # 30 days
json.dumps({
'user_id': user_id,
'client_id': client_id,
'scope': scope,
'created_at': time.time()
})
)
return token