Broken Object Level Authorization (BOLA)
Broken Object Level Authorization (BOLA)
Broken Object Level Authorization, also known as Insecure Direct Object References (IDOR), remains the most critical API vulnerability. This flaw occurs when APIs expose object identifiers without properly verifying that the requesting user has permission to access that specific object. Attackers exploit BOLA by manipulating identifiers to access unauthorized data.
The impact of BOLA vulnerabilities can be devastating. Attackers can access sensitive user data, modify records belonging to other users, or delete critical information. In multi-tenant applications, BOLA can lead to complete data breaches across customer boundaries. The simplicity of exploiting BOLA makes it a favorite target for both automated scanners and manual attackers.
# Python example: Vulnerable and secure implementations
# VULNERABLE: Direct object access without authorization
@app.route('/api/users/<user_id>/profile', methods=['GET'])
def get_user_profile_vulnerable(user_id):
# NO authorization check - any authenticated user can access any profile
user = db.users.find_one({'_id': user_id})
if user:
return jsonify(user), 200
return jsonify({'error': 'User not found'}), 404
# SECURE: Proper object-level authorization
from functools import wraps
from flask import g, jsonify
def owns_resource(resource_type):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Extract resource ID based on resource type
if resource_type == 'user':
resource_id = kwargs.get('user_id')
# Check if requesting user owns this profile
if g.current_user['id'] != resource_id and not g.current_user.get('is_admin'):
return jsonify({'error': 'Unauthorized access'}), 403
elif resource_type == 'order':
order_id = kwargs.get('order_id')
# Fetch order and verify ownership
order = db.orders.find_one({'_id': order_id})
if not order:
return jsonify({'error': 'Order not found'}), 404
if order['user_id'] != g.current_user['id'] and not g.current_user.get('is_admin'):
return jsonify({'error': 'Unauthorized access'}), 403
# Store order in context for the route
g.order = order
elif resource_type == 'document':
document_id = kwargs.get('document_id')
# Check document permissions
document = db.documents.find_one({'_id': document_id})
if not document:
return jsonify({'error': 'Document not found'}), 404
# Check various permission models
has_permission = (
document['owner_id'] == g.current_user['id'] or
g.current_user['id'] in document.get('shared_with', []) or
g.current_user.get('is_admin') or
check_group_permissions(g.current_user, document)
)
if not has_permission:
return jsonify({'error': 'Unauthorized access'}), 403
g.document = document
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/api/users/<user_id>/profile', methods=['GET'])
@require_auth
@owns_resource('user')
def get_user_profile_secure(user_id):
# Authorization already checked by decorator
user = db.users.find_one({'_id': user_id})
# Remove sensitive fields based on requester
if g.current_user['id'] != user_id:
user = filter_sensitive_fields(user, ['email', 'phone', 'address'])
return jsonify(user), 200
@app.route('/api/orders/<order_id>', methods=['PUT'])
@require_auth
@owns_resource('order')
def update_order_secure(order_id):
# Order already fetched and authorized in decorator
order = g.order
updates = request.get_json()
# Additional business logic validation
if order['status'] == 'completed':
return jsonify({'error': 'Cannot modify completed orders'}), 400
# Apply updates with field-level authorization
allowed_fields = get_allowed_fields_for_user(g.current_user, 'order')
filtered_updates = {k: v for k, v in updates.items() if k in allowed_fields}
db.orders.update_one({'_id': order_id}, {'$set': filtered_updates})
return jsonify({'message': 'Order updated successfully'}), 200
# Advanced: Hierarchical authorization with caching
class AuthorizationService:
def __init__(self, cache_client):
self.cache = cache_client
def check_resource_access(self, user_id, resource_type, resource_id, action='read'):
# Check cache first
cache_key = f"auth:{user_id}:{resource_type}:{resource_id}:{action}"
cached_result = self.cache.get(cache_key)
if cached_result is not None:
return cached_result
# Perform authorization check
has_access = self._perform_authorization_check(
user_id, resource_type, resource_id, action
)
# Cache result with appropriate TTL
ttl = 300 if has_access else 60 # Cache denials for shorter time
self.cache.setex(cache_key, ttl, has_access)
return has_access