API Key Rotation Strategies
API Key Rotation Strategies
Regular key rotation limits the window of exposure if keys are compromised. However, rotation must be implemented carefully to avoid service disruptions. Support for multiple active keys during transition periods enables zero-downtime rotation.
# Python API key rotation implementation
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Optional
import asyncio
class KeyStatus(Enum):
ACTIVE = "active"
ROTATING = "rotating"
DEPRECATED = "deprecated"
REVOKED = "revoked"
class APIKeyRotationManager:
def __init__(self, db, notification_service):
self.db = db
self.notification_service = notification_service
async def initiate_rotation(self, user_id: str, key_id: str) -> dict:
"""Initiate key rotation process"""
# Verify existing key
old_key = await self.db.get_api_key(key_id)
if not old_key or old_key.user_id != user_id:
raise ValueError("Invalid key")
# Generate new key
generator = APIKeyGenerator(settings.SIGNING_SECRET)
new_api_key = generator.generate_prefixed_key("sk_live")
# Create new key record
new_key_record = {
'user_id': user_id,
'key_hash': APIKey.hash_key(new_api_key),
'status': KeyStatus.ACTIVE.value,
'replaces_key_id': key_id,
'created_at': datetime.utcnow()
}
new_key_id = await self.db.create_api_key(new_key_record)
# Update old key status
await self.db.update_api_key(key_id, {
'status': KeyStatus.ROTATING.value,
'replaced_by_key_id': new_key_id,
'rotation_started_at': datetime.utcnow(),
'rotation_deadline': datetime.utcnow() + timedelta(days=30)
})
# Notify user
await self.notification_service.send_rotation_notification(
user_id,
old_key_id=key_id,
new_key_id=new_key_id,
deadline=new_key_record['rotation_deadline']
)
return {
'new_key_id': new_key_id,
'new_key': new_api_key, # Only returned once
'rotation_deadline': new_key_record['rotation_deadline']
}
async def complete_rotation(self, key_id: str):
"""Complete rotation by deprecating old key"""
key = await self.db.get_api_key(key_id)
if key.status != KeyStatus.ROTATING.value:
raise ValueError("Key is not in rotation")
# Check if new key is being used
new_key = await self.db.get_api_key(key.replaced_by_key_id)
if not new_key.last_used_at:
raise ValueError("New key has not been used yet")
# Deprecate old key
await self.db.update_api_key(key_id, {
'status': KeyStatus.DEPRECATED.value,
'deprecated_at': datetime.utcnow()
})
# Schedule final revocation
await self.schedule_revocation(key_id, days=7)
async def auto_rotate_old_keys(self):
"""Automatically rotate keys older than threshold"""
threshold = datetime.utcnow() - timedelta(days=90)
old_keys = await self.db.find_api_keys({
'status': KeyStatus.ACTIVE.value,
'created_at': {'$lt': threshold},
'auto_rotate': True
})
for key in old_keys:
try:
await self.initiate_rotation(key.user_id, key.id)
except Exception as e:
logger.error(f"Failed to rotate key {key.id}: {e}")
async def monitor_rotation_progress(self):
"""Monitor ongoing rotations and send reminders"""
rotating_keys = await self.db.find_api_keys({
'status': KeyStatus.ROTATING.value
})
for key in rotating_keys:
days_until_deadline = (key.rotation_deadline - datetime.utcnow()).days
# Send reminders at specific intervals
if days_until_deadline in [14, 7, 3, 1]:
await self.notification_service.send_rotation_reminder(
key.user_id,
key.id,
days_until_deadline
)
# Force deprecation after deadline
elif days_until_deadline < 0:
await self.force_complete_rotation(key.id)
# Middleware for handling multiple active keys
class MultiKeyAuthMiddleware:
def __init__(self, key_store):
self.key_store = key_store
async def authenticate(self, request):
api_key = self.extract_api_key(request)
if not api_key:
return None
# Verify key and get metadata
result = await self.key_store.verify_api_key(api_key)
if not result['valid']:
return None
# Check key status
key_record = await self.key_store.get_key_by_hash(result['key_hash'])
if key_record['status'] == KeyStatus.ROTATING.value:
# Add warning header for rotating keys
request.headers['X-API-Key-Warning'] = 'Key is being rotated. Please update to new key.'
elif key_record['status'] == KeyStatus.DEPRECATED.value:
# Add strong warning for deprecated keys
request.headers['X-API-Key-Warning'] = 'Key is deprecated and will be revoked soon.'
elif key_record['status'] == KeyStatus.REVOKED.value:
# Reject revoked keys
return None
return result['userId']