API Key Rotation Strategies

API Key Rotation Strategies

Regular key rotation limits the window of exposure if keys are compromised. However, rotation must be implemented carefully to avoid service disruptions. Support for multiple active keys during transition periods enables zero-downtime rotation.

# Python API key rotation implementation
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Optional
import asyncio

class KeyStatus(Enum):
    ACTIVE = "active"
    ROTATING = "rotating"
    DEPRECATED = "deprecated"
    REVOKED = "revoked"

class APIKeyRotationManager:
    def __init__(self, db, notification_service):
        self.db = db
        self.notification_service = notification_service
        
    async def initiate_rotation(self, user_id: str, key_id: str) -> dict:
        """Initiate key rotation process"""
        # Verify existing key
        old_key = await self.db.get_api_key(key_id)
        if not old_key or old_key.user_id != user_id:
            raise ValueError("Invalid key")
        
        # Generate new key
        generator = APIKeyGenerator(settings.SIGNING_SECRET)
        new_api_key = generator.generate_prefixed_key("sk_live")
        
        # Create new key record
        new_key_record = {
            'user_id': user_id,
            'key_hash': APIKey.hash_key(new_api_key),
            'status': KeyStatus.ACTIVE.value,
            'replaces_key_id': key_id,
            'created_at': datetime.utcnow()
        }
        
        new_key_id = await self.db.create_api_key(new_key_record)
        
        # Update old key status
        await self.db.update_api_key(key_id, {
            'status': KeyStatus.ROTATING.value,
            'replaced_by_key_id': new_key_id,
            'rotation_started_at': datetime.utcnow(),
            'rotation_deadline': datetime.utcnow() + timedelta(days=30)
        })
        
        # Notify user
        await self.notification_service.send_rotation_notification(
            user_id,
            old_key_id=key_id,
            new_key_id=new_key_id,
            deadline=new_key_record['rotation_deadline']
        )
        
        return {
            'new_key_id': new_key_id,
            'new_key': new_api_key,  # Only returned once
            'rotation_deadline': new_key_record['rotation_deadline']
        }
    
    async def complete_rotation(self, key_id: str):
        """Complete rotation by deprecating old key"""
        key = await self.db.get_api_key(key_id)
        
        if key.status != KeyStatus.ROTATING.value:
            raise ValueError("Key is not in rotation")
        
        # Check if new key is being used
        new_key = await self.db.get_api_key(key.replaced_by_key_id)
        if not new_key.last_used_at:
            raise ValueError("New key has not been used yet")
        
        # Deprecate old key
        await self.db.update_api_key(key_id, {
            'status': KeyStatus.DEPRECATED.value,
            'deprecated_at': datetime.utcnow()
        })
        
        # Schedule final revocation
        await self.schedule_revocation(key_id, days=7)
    
    async def auto_rotate_old_keys(self):
        """Automatically rotate keys older than threshold"""
        threshold = datetime.utcnow() - timedelta(days=90)
        
        old_keys = await self.db.find_api_keys({
            'status': KeyStatus.ACTIVE.value,
            'created_at': {'$lt': threshold},
            'auto_rotate': True
        })
        
        for key in old_keys:
            try:
                await self.initiate_rotation(key.user_id, key.id)
            except Exception as e:
                logger.error(f"Failed to rotate key {key.id}: {e}")
    
    async def monitor_rotation_progress(self):
        """Monitor ongoing rotations and send reminders"""
        rotating_keys = await self.db.find_api_keys({
            'status': KeyStatus.ROTATING.value
        })
        
        for key in rotating_keys:
            days_until_deadline = (key.rotation_deadline - datetime.utcnow()).days
            
            # Send reminders at specific intervals
            if days_until_deadline in [14, 7, 3, 1]:
                await self.notification_service.send_rotation_reminder(
                    key.user_id,
                    key.id,
                    days_until_deadline
                )
            
            # Force deprecation after deadline
            elif days_until_deadline < 0:
                await self.force_complete_rotation(key.id)

# Middleware for handling multiple active keys
class MultiKeyAuthMiddleware:
    def __init__(self, key_store):
        self.key_store = key_store
    
    async def authenticate(self, request):
        api_key = self.extract_api_key(request)
        if not api_key:
            return None
        
        # Verify key and get metadata
        result = await self.key_store.verify_api_key(api_key)
        if not result['valid']:
            return None
        
        # Check key status
        key_record = await self.key_store.get_key_by_hash(result['key_hash'])
        
        if key_record['status'] == KeyStatus.ROTATING.value:
            # Add warning header for rotating keys
            request.headers['X-API-Key-Warning'] = 'Key is being rotated. Please update to new key.'
        elif key_record['status'] == KeyStatus.DEPRECATED.value:
            # Add strong warning for deprecated keys
            request.headers['X-API-Key-Warning'] = 'Key is deprecated and will be revoked soon.'
        elif key_record['status'] == KeyStatus.REVOKED.value:
            # Reject revoked keys
            return None
        
        return result['userId']