Planning Key Rotation Lifecycle
Planning Key Rotation Lifecycle
Effective key rotation begins with understanding the complete lifecycle of SSH keys in your environment. Each key serves specific purposes, connects particular users to systems, and has different risk profiles that influence rotation requirements. Developing a comprehensive rotation strategy requires mapping these relationships and establishing appropriate rotation schedules based on security requirements and operational constraints.
Key rotation frequency depends on multiple factors including the sensitivity of accessed systems, compliance requirements, and the key's exposure level. High-privilege keys accessing production systems might require monthly rotation, while keys for development environments could rotate quarterly. Emergency response keys might never rotate but require additional protective controls. Understanding these nuances enables creation of rotation policies that balance security with operational feasibility.
Implement a key rotation planning framework:
#!/usr/bin/env python3
# key-rotation-planner.py
# Comprehensive SSH key rotation planning system
import json
import sqlite3
from datetime import datetime, timedelta
import hashlib
from enum import Enum
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class KeyPriority(Enum):
CRITICAL = 1 # Production root/admin keys
HIGH = 2 # Production service accounts
MEDIUM = 3 # Development/staging access
LOW = 4 # Test environment access
class KeyRotationPlanner:
def __init__(self, db_path, config_path):
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
with open(config_path, 'r') as f:
self.config = json.load(f)
self._init_database()
def _init_database(self):
"""Initialize rotation planning database"""
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS ssh_keys (
key_id TEXT PRIMARY KEY,
fingerprint TEXT UNIQUE,
key_type TEXT,
created_date DATE,
last_rotated DATE,
next_rotation DATE,
priority INTEGER,
owner TEXT,
purpose TEXT,
systems TEXT,
compliance_requirements TEXT,
rotation_count INTEGER DEFAULT 0,
active BOOLEAN DEFAULT 1
)''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS rotation_history (
rotation_id INTEGER PRIMARY KEY AUTOINCREMENT,
key_id TEXT,
old_fingerprint TEXT,
new_fingerprint TEXT,
rotation_date DATE,
rotation_reason TEXT,
performed_by TEXT,
success BOOLEAN,
affected_systems INTEGER,
downtime_minutes INTEGER,
FOREIGN KEY(key_id) REFERENCES ssh_keys(key_id)
)''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS rotation_schedule (
schedule_id INTEGER PRIMARY KEY AUTOINCREMENT,
key_id TEXT,
scheduled_date DATE,
notification_sent BOOLEAN DEFAULT 0,
approved BOOLEAN DEFAULT 0,
approved_by TEXT,
approval_date DATE,
maintenance_window TEXT,
FOREIGN KEY(key_id) REFERENCES ssh_keys(key_id)
)''')
self.conn.commit()
def register_key(self, fingerprint, key_type, owner, purpose, systems, priority):
"""Register a new SSH key for rotation tracking"""
key_id = hashlib.sha256(f"{fingerprint}{owner}".encode()).hexdigest()[:12]
# Determine rotation frequency based on priority
rotation_days = self.config['rotation_intervals'][priority.name]
next_rotation = datetime.now() + timedelta(days=rotation_days)
# Check compliance requirements
compliance_reqs = self.determine_compliance_requirements(purpose, systems)
cursor = self.conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO ssh_keys
(key_id, fingerprint, key_type, created_date, last_rotated,
next_rotation, priority, owner, purpose, systems,
compliance_requirements)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
key_id, fingerprint, key_type, datetime.now().date(),
datetime.now().date(), next_rotation.date(), priority.value,
owner, purpose, json.dumps(systems), json.dumps(compliance_reqs)
))
self.conn.commit()
# Schedule rotation
self.schedule_rotation(key_id, next_rotation)
return key_id
def determine_compliance_requirements(self, purpose, systems):
"""Determine applicable compliance requirements"""
requirements = []
# Check system classifications
for system in systems:
if 'payment' in system.lower() or 'pci' in system.lower():
requirements.append({
'framework': 'PCI-DSS',
'requirement': '3.6.4',
'max_rotation_days': 90
})
if 'healthcare' in system.lower() or 'phi' in system.lower():
requirements.append({
'framework': 'HIPAA',
'requirement': '164.312(a)(2)(iv)',
'max_rotation_days': 180
})
if 'financial' in system.lower():
requirements.append({
'framework': 'SOX',
'requirement': 'Section 404',
'max_rotation_days': 90
})
# Check purpose-based requirements
if 'root' in purpose.lower() or 'admin' in purpose.lower():
requirements.append({
'framework': 'Best Practice',
'requirement': 'Privileged Access',
'max_rotation_days': 30
})
return requirements
def schedule_rotation(self, key_id, rotation_date):
"""Schedule a key rotation"""
cursor = self.conn.cursor()
# Check for existing schedule
cursor.execute('''
SELECT schedule_id FROM rotation_schedule
WHERE key_id = ? AND scheduled_date = ?
''', (key_id, rotation_date.date()))
if not cursor.fetchone():
# Determine maintenance window
maintenance_window = self.find_maintenance_window(key_id, rotation_date)
cursor.execute('''
INSERT INTO rotation_schedule
(key_id, scheduled_date, maintenance_window)
VALUES (?, ?, ?)
''', (key_id, rotation_date.date(), maintenance_window))
self.conn.commit()
def find_maintenance_window(self, key_id, preferred_date):
"""Find suitable maintenance window for rotation"""
cursor = self.conn.cursor()
cursor.execute('SELECT systems FROM ssh_keys WHERE key_id = ?', (key_id,))
result = cursor.fetchone()
if result:
systems = json.loads(result['systems'])
# Check system maintenance windows
windows = []
for system in systems:
window = self.config['maintenance_windows'].get(
system,
self.config['maintenance_windows']['default']
)
windows.append(window)
# Find common window
# For simplicity, return the most restrictive window
return min(windows, key=lambda w: w['duration_hours'])
return self.config['maintenance_windows']['default']
def generate_rotation_plan(self, days_ahead=30):
"""Generate rotation plan for upcoming period"""
cursor = self.conn.cursor()
end_date = datetime.now() + timedelta(days=days_ahead)
cursor.execute('''
SELECT k.*, s.scheduled_date, s.maintenance_window
FROM ssh_keys k
JOIN rotation_schedule s ON k.key_id = s.key_id
WHERE s.scheduled_date <= ?
AND s.approved = 0
AND k.active = 1
ORDER BY k.priority, s.scheduled_date
''', (end_date.date(),))
plan = {
'generated_date': datetime.now().isoformat(),
'period_days': days_ahead,
'rotations': []
}
for row in cursor.fetchall():
rotation_info = {
'key_id': row['key_id'],
'owner': row['owner'],
'purpose': row['purpose'],
'priority': KeyPriority(row['priority']).name,
'scheduled_date': row['scheduled_date'],
'systems': json.loads(row['systems']),
'compliance': json.loads(row['compliance_requirements']),
'maintenance_window': json.loads(row['maintenance_window']),
'rotation_count': row['rotation_count'],
'risk_score': self.calculate_rotation_risk(row)
}
plan['rotations'].append(rotation_info)
# Group by date
plan['summary'] = self.summarize_rotation_plan(plan['rotations'])
return plan
def calculate_rotation_risk(self, key_info):
"""Calculate risk score for rotation"""
risk_score = 0
# Priority-based risk
risk_score += (5 - key_info['priority']) * 20
# System count risk
systems = json.loads(key_info['systems'])
risk_score += len(systems) * 5
# Time since last rotation
last_rotated = datetime.strptime(key_info['last_rotated'], '%Y-%m-%d')
days_since = (datetime.now() - last_rotated).days
if days_since > 180:
risk_score += 20
elif days_since > 90:
risk_score += 10
# Compliance requirements
compliance = json.loads(key_info['compliance_requirements'])
risk_score += len(compliance) * 10
return min(100, risk_score)
def send_rotation_notifications(self):
"""Send notifications for upcoming rotations"""
cursor = self.conn.cursor()
# Get rotations needing notification
notification_days = self.config['notification_days']
notification_date = datetime.now() + timedelta(days=notification_days)
cursor.execute('''
SELECT k.*, s.scheduled_date, s.schedule_id
FROM ssh_keys k
JOIN rotation_schedule s ON k.key_id = s.key_id
WHERE s.scheduled_date <= ?
AND s.notification_sent = 0
AND k.active = 1
''', (notification_date.date(),))
for row in cursor.fetchall():
self.send_notification(row)
# Mark as notified
cursor.execute('''
UPDATE rotation_schedule
SET notification_sent = 1
WHERE schedule_id = ?
''', (row['schedule_id'],))
self.conn.commit()
def send_notification(self, rotation_info):
"""Send rotation notification email"""
subject = f"SSH Key Rotation Required - {rotation_info['purpose']}"
body = f"""
SSH Key Rotation Notification
Key Details:
- Purpose: {rotation_info['purpose']}
- Owner: {rotation_info['owner']}
- Priority: {KeyPriority(rotation_info['priority']).name}
- Scheduled Date: {rotation_info['scheduled_date']}
- Affected Systems: {', '.join(json.loads(rotation_info['systems']))}
Compliance Requirements:
{self.format_compliance_requirements(rotation_info['compliance_requirements'])}
Action Required:
Please approve this rotation by visiting: {self.config['approval_url']}/approve/{rotation_info['key_id']}
For assistance, contact the security team.
"""
msg = MIMEMultipart()
msg['From'] = self.config['smtp']['from_address']
msg['To'] = rotation_info['owner']
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
# Send email
with smtplib.SMTP(self.config['smtp']['server'], self.config['smtp']['port']) as server:
if self.config['smtp'].get('use_tls'):
server.starttls()
if self.config['smtp'].get('username'):
server.login(self.config['smtp']['username'], self.config['smtp']['password'])
server.send_message(msg)
def approve_rotation(self, key_id, approver):
"""Approve a scheduled rotation"""
cursor = self.conn.cursor()
cursor.execute('''
UPDATE rotation_schedule
SET approved = 1, approved_by = ?, approval_date = ?
WHERE key_id = ? AND approved = 0
''', (approver, datetime.now().date(), key_id))
self.conn.commit()
return cursor.rowcount > 0
# Rotation planning CLI
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='SSH Key Rotation Planner')
parser.add_argument('action', choices=['register', 'plan', 'notify', 'approve'])
parser.add_argument('--fingerprint', help='Key fingerprint')
parser.add_argument('--owner', help='Key owner')
parser.add_argument('--purpose', help='Key purpose')
parser.add_argument('--systems', nargs='+', help='Systems accessed')
parser.add_argument('--priority', choices=['CRITICAL', 'HIGH', 'MEDIUM', 'LOW'])
parser.add_argument('--days', type=int, default=30, help='Days ahead for planning')
parser.add_argument('--key-id', help='Key ID for approval')
parser.add_argument('--approver', help='Approver name')
args = parser.parse_args()
planner = KeyRotationPlanner('/var/lib/ssh-rotation/rotation.db',
'/etc/ssh-rotation/config.json')
if args.action == 'register':
key_id = planner.register_key(
args.fingerprint,
'ed25519', # Detected from key
args.owner,
args.purpose,
args.systems,
KeyPriority[args.priority]
)
print(f"Registered key: {key_id}")
elif args.action == 'plan':
plan = planner.generate_rotation_plan(args.days)
print(json.dumps(plan, indent=2))
elif args.action == 'notify':
planner.send_rotation_notifications()
print("Notifications sent")
elif args.action == 'approve':
success = planner.approve_rotation(args.key_id, args.approver)
print(f"Approval {'successful' if success else 'failed'}")