Planning Key Rotation Lifecycle

Planning Key Rotation Lifecycle

Effective key rotation begins with understanding the complete lifecycle of SSH keys in your environment. Each key serves specific purposes, connects particular users to systems, and has different risk profiles that influence rotation requirements. Developing a comprehensive rotation strategy requires mapping these relationships and establishing appropriate rotation schedules based on security requirements and operational constraints.

Key rotation frequency depends on multiple factors including the sensitivity of accessed systems, compliance requirements, and the key's exposure level. High-privilege keys accessing production systems might require monthly rotation, while keys for development environments could rotate quarterly. Emergency response keys might never rotate but require additional protective controls. Understanding these nuances enables creation of rotation policies that balance security with operational feasibility.

Implement a key rotation planning framework:

#!/usr/bin/env python3
# key-rotation-planner.py
# Comprehensive SSH key rotation planning system

import json
import sqlite3
from datetime import datetime, timedelta
import hashlib
from enum import Enum
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class KeyPriority(Enum):
    CRITICAL = 1  # Production root/admin keys
    HIGH = 2      # Production service accounts
    MEDIUM = 3    # Development/staging access
    LOW = 4       # Test environment access

class KeyRotationPlanner:
    def __init__(self, db_path, config_path):
        self.conn = sqlite3.connect(db_path)
        self.conn.row_factory = sqlite3.Row
        
        with open(config_path, 'r') as f:
            self.config = json.load(f)
            
        self._init_database()
        
    def _init_database(self):
        """Initialize rotation planning database"""
        cursor = self.conn.cursor()
        
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS ssh_keys (
            key_id TEXT PRIMARY KEY,
            fingerprint TEXT UNIQUE,
            key_type TEXT,
            created_date DATE,
            last_rotated DATE,
            next_rotation DATE,
            priority INTEGER,
            owner TEXT,
            purpose TEXT,
            systems TEXT,
            compliance_requirements TEXT,
            rotation_count INTEGER DEFAULT 0,
            active BOOLEAN DEFAULT 1
        )''')
        
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS rotation_history (
            rotation_id INTEGER PRIMARY KEY AUTOINCREMENT,
            key_id TEXT,
            old_fingerprint TEXT,
            new_fingerprint TEXT,
            rotation_date DATE,
            rotation_reason TEXT,
            performed_by TEXT,
            success BOOLEAN,
            affected_systems INTEGER,
            downtime_minutes INTEGER,
            FOREIGN KEY(key_id) REFERENCES ssh_keys(key_id)
        )''')
        
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS rotation_schedule (
            schedule_id INTEGER PRIMARY KEY AUTOINCREMENT,
            key_id TEXT,
            scheduled_date DATE,
            notification_sent BOOLEAN DEFAULT 0,
            approved BOOLEAN DEFAULT 0,
            approved_by TEXT,
            approval_date DATE,
            maintenance_window TEXT,
            FOREIGN KEY(key_id) REFERENCES ssh_keys(key_id)
        )''')
        
        self.conn.commit()
        
    def register_key(self, fingerprint, key_type, owner, purpose, systems, priority):
        """Register a new SSH key for rotation tracking"""
        key_id = hashlib.sha256(f"{fingerprint}{owner}".encode()).hexdigest()[:12]
        
        # Determine rotation frequency based on priority
        rotation_days = self.config['rotation_intervals'][priority.name]
        next_rotation = datetime.now() + timedelta(days=rotation_days)
        
        # Check compliance requirements
        compliance_reqs = self.determine_compliance_requirements(purpose, systems)
        
        cursor = self.conn.cursor()
        cursor.execute('''
        INSERT OR REPLACE INTO ssh_keys 
        (key_id, fingerprint, key_type, created_date, last_rotated, 
         next_rotation, priority, owner, purpose, systems, 
         compliance_requirements)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            key_id, fingerprint, key_type, datetime.now().date(),
            datetime.now().date(), next_rotation.date(), priority.value,
            owner, purpose, json.dumps(systems), json.dumps(compliance_reqs)
        ))
        
        self.conn.commit()
        
        # Schedule rotation
        self.schedule_rotation(key_id, next_rotation)
        
        return key_id
        
    def determine_compliance_requirements(self, purpose, systems):
        """Determine applicable compliance requirements"""
        requirements = []
        
        # Check system classifications
        for system in systems:
            if 'payment' in system.lower() or 'pci' in system.lower():
                requirements.append({
                    'framework': 'PCI-DSS',
                    'requirement': '3.6.4',
                    'max_rotation_days': 90
                })
                
            if 'healthcare' in system.lower() or 'phi' in system.lower():
                requirements.append({
                    'framework': 'HIPAA',
                    'requirement': '164.312(a)(2)(iv)',
                    'max_rotation_days': 180
                })
                
            if 'financial' in system.lower():
                requirements.append({
                    'framework': 'SOX',
                    'requirement': 'Section 404',
                    'max_rotation_days': 90
                })
                
        # Check purpose-based requirements
        if 'root' in purpose.lower() or 'admin' in purpose.lower():
            requirements.append({
                'framework': 'Best Practice',
                'requirement': 'Privileged Access',
                'max_rotation_days': 30
            })
            
        return requirements
        
    def schedule_rotation(self, key_id, rotation_date):
        """Schedule a key rotation"""
        cursor = self.conn.cursor()
        
        # Check for existing schedule
        cursor.execute('''
        SELECT schedule_id FROM rotation_schedule 
        WHERE key_id = ? AND scheduled_date = ?
        ''', (key_id, rotation_date.date()))
        
        if not cursor.fetchone():
            # Determine maintenance window
            maintenance_window = self.find_maintenance_window(key_id, rotation_date)
            
            cursor.execute('''
            INSERT INTO rotation_schedule 
            (key_id, scheduled_date, maintenance_window)
            VALUES (?, ?, ?)
            ''', (key_id, rotation_date.date(), maintenance_window))
            
            self.conn.commit()
            
    def find_maintenance_window(self, key_id, preferred_date):
        """Find suitable maintenance window for rotation"""
        cursor = self.conn.cursor()
        cursor.execute('SELECT systems FROM ssh_keys WHERE key_id = ?', (key_id,))
        
        result = cursor.fetchone()
        if result:
            systems = json.loads(result['systems'])
            
            # Check system maintenance windows
            windows = []
            for system in systems:
                window = self.config['maintenance_windows'].get(
                    system, 
                    self.config['maintenance_windows']['default']
                )
                windows.append(window)
                
            # Find common window
            # For simplicity, return the most restrictive window
            return min(windows, key=lambda w: w['duration_hours'])
            
        return self.config['maintenance_windows']['default']
        
    def generate_rotation_plan(self, days_ahead=30):
        """Generate rotation plan for upcoming period"""
        cursor = self.conn.cursor()
        
        end_date = datetime.now() + timedelta(days=days_ahead)
        
        cursor.execute('''
        SELECT k.*, s.scheduled_date, s.maintenance_window
        FROM ssh_keys k
        JOIN rotation_schedule s ON k.key_id = s.key_id
        WHERE s.scheduled_date <= ? 
        AND s.approved = 0
        AND k.active = 1
        ORDER BY k.priority, s.scheduled_date
        ''', (end_date.date(),))
        
        plan = {
            'generated_date': datetime.now().isoformat(),
            'period_days': days_ahead,
            'rotations': []
        }
        
        for row in cursor.fetchall():
            rotation_info = {
                'key_id': row['key_id'],
                'owner': row['owner'],
                'purpose': row['purpose'],
                'priority': KeyPriority(row['priority']).name,
                'scheduled_date': row['scheduled_date'],
                'systems': json.loads(row['systems']),
                'compliance': json.loads(row['compliance_requirements']),
                'maintenance_window': json.loads(row['maintenance_window']),
                'rotation_count': row['rotation_count'],
                'risk_score': self.calculate_rotation_risk(row)
            }
            
            plan['rotations'].append(rotation_info)
            
        # Group by date
        plan['summary'] = self.summarize_rotation_plan(plan['rotations'])
        
        return plan
        
    def calculate_rotation_risk(self, key_info):
        """Calculate risk score for rotation"""
        risk_score = 0
        
        # Priority-based risk
        risk_score += (5 - key_info['priority']) * 20
        
        # System count risk
        systems = json.loads(key_info['systems'])
        risk_score += len(systems) * 5
        
        # Time since last rotation
        last_rotated = datetime.strptime(key_info['last_rotated'], '%Y-%m-%d')
        days_since = (datetime.now() - last_rotated).days
        if days_since > 180:
            risk_score += 20
        elif days_since > 90:
            risk_score += 10
            
        # Compliance requirements
        compliance = json.loads(key_info['compliance_requirements'])
        risk_score += len(compliance) * 10
        
        return min(100, risk_score)
        
    def send_rotation_notifications(self):
        """Send notifications for upcoming rotations"""
        cursor = self.conn.cursor()
        
        # Get rotations needing notification
        notification_days = self.config['notification_days']
        notification_date = datetime.now() + timedelta(days=notification_days)
        
        cursor.execute('''
        SELECT k.*, s.scheduled_date, s.schedule_id
        FROM ssh_keys k
        JOIN rotation_schedule s ON k.key_id = s.key_id
        WHERE s.scheduled_date <= ?
        AND s.notification_sent = 0
        AND k.active = 1
        ''', (notification_date.date(),))
        
        for row in cursor.fetchall():
            self.send_notification(row)
            
            # Mark as notified
            cursor.execute('''
            UPDATE rotation_schedule 
            SET notification_sent = 1 
            WHERE schedule_id = ?
            ''', (row['schedule_id'],))
            
        self.conn.commit()
        
    def send_notification(self, rotation_info):
        """Send rotation notification email"""
        subject = f"SSH Key Rotation Required - {rotation_info['purpose']}"
        
        body = f"""
        SSH Key Rotation Notification
        
        Key Details:
        - Purpose: {rotation_info['purpose']}
        - Owner: {rotation_info['owner']}
        - Priority: {KeyPriority(rotation_info['priority']).name}
        - Scheduled Date: {rotation_info['scheduled_date']}
        - Affected Systems: {', '.join(json.loads(rotation_info['systems']))}
        
        Compliance Requirements:
        {self.format_compliance_requirements(rotation_info['compliance_requirements'])}
        
        Action Required:
        Please approve this rotation by visiting: {self.config['approval_url']}/approve/{rotation_info['key_id']}
        
        For assistance, contact the security team.
        """
        
        msg = MIMEMultipart()
        msg['From'] = self.config['smtp']['from_address']
        msg['To'] = rotation_info['owner']
        msg['Subject'] = subject
        msg.attach(MIMEText(body, 'plain'))
        
        # Send email
        with smtplib.SMTP(self.config['smtp']['server'], self.config['smtp']['port']) as server:
            if self.config['smtp'].get('use_tls'):
                server.starttls()
            if self.config['smtp'].get('username'):
                server.login(self.config['smtp']['username'], self.config['smtp']['password'])
            server.send_message(msg)
            
    def approve_rotation(self, key_id, approver):
        """Approve a scheduled rotation"""
        cursor = self.conn.cursor()
        
        cursor.execute('''
        UPDATE rotation_schedule 
        SET approved = 1, approved_by = ?, approval_date = ?
        WHERE key_id = ? AND approved = 0
        ''', (approver, datetime.now().date(), key_id))
        
        self.conn.commit()
        
        return cursor.rowcount > 0

# Rotation planning CLI
if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='SSH Key Rotation Planner')
    parser.add_argument('action', choices=['register', 'plan', 'notify', 'approve'])
    parser.add_argument('--fingerprint', help='Key fingerprint')
    parser.add_argument('--owner', help='Key owner')
    parser.add_argument('--purpose', help='Key purpose')
    parser.add_argument('--systems', nargs='+', help='Systems accessed')
    parser.add_argument('--priority', choices=['CRITICAL', 'HIGH', 'MEDIUM', 'LOW'])
    parser.add_argument('--days', type=int, default=30, help='Days ahead for planning')
    parser.add_argument('--key-id', help='Key ID for approval')
    parser.add_argument('--approver', help='Approver name')
    
    args = parser.parse_args()
    
    planner = KeyRotationPlanner('/var/lib/ssh-rotation/rotation.db', 
                                '/etc/ssh-rotation/config.json')
    
    if args.action == 'register':
        key_id = planner.register_key(
            args.fingerprint,
            'ed25519',  # Detected from key
            args.owner,
            args.purpose,
            args.systems,
            KeyPriority[args.priority]
        )
        print(f"Registered key: {key_id}")
        
    elif args.action == 'plan':
        plan = planner.generate_rotation_plan(args.days)
        print(json.dumps(plan, indent=2))
        
    elif args.action == 'notify':
        planner.send_rotation_notifications()
        print("Notifications sent")
        
    elif args.action == 'approve':
        success = planner.approve_rotation(args.key_id, args.approver)
        print(f"Approval {'successful' if success else 'failed'}")