Key Rotation Strategies
Key Rotation Strategies
Regular key rotation limits the impact of compromised keys and ensures compliance with security policies. Automated rotation systems handle the complexity of updating keys across distributed infrastructure without disrupting service.
Implement intelligent key rotation:
#!/usr/bin/env python3
# ssh-key-rotation.py
# Intelligent SSH key rotation system
import os
import json
import logging
from datetime import datetime, timedelta
import paramiko
import concurrent.futures
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
class SSHKeyRotationManager:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.logger = self._setup_logging()
self.rotation_policies = self.config['rotation_policies']
self.notification_handlers = []
def _setup_logging(self):
logger = logging.getLogger('ssh_key_rotation')
handler = logging.FileHandler('/var/log/ssh-key-rotation.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def evaluate_rotation_need(self, key_metadata):
"""Determine if key needs rotation based on policies"""
key_age = (datetime.now() - key_metadata['created_date']).days
# Check age-based rotation
for policy in self.rotation_policies:
if key_metadata['key_type'] in policy['key_types']:
if key_age >= policy['max_age_days']:
return True, f"Key age ({key_age} days) exceeds policy maximum ({policy['max_age_days']} days)"
# Check usage-based rotation
if key_metadata.get('usage_count', 0) > self.config['max_usage_count']:
return True, f"Usage count exceeds maximum ({key_metadata['usage_count']})"
# Check compromise indicators
if self.check_compromise_indicators(key_metadata):
return True, "Potential compromise detected"
return False, None
def check_compromise_indicators(self, key_metadata):
"""Check for signs of key compromise"""
indicators = []
# Unusual access patterns
if key_metadata.get('access_from_unusual_location'):
indicators.append('unusual_location')
# Concurrent usage from multiple locations
if key_metadata.get('concurrent_usage_detected'):
indicators.append('concurrent_usage')
# Failed authentication spike
if key_metadata.get('failed_auth_spike'):
indicators.append('auth_failures')
return len(indicators) > 0
def rotate_key(self, old_key_id, key_metadata):
"""Perform key rotation"""
self.logger.info(f"Starting rotation for key {old_key_id}")
try:
# Generate new key pair
new_private_key = ed25519.Ed25519PrivateKey.generate()
new_public_key = new_private_key.public_key()
# Serialize keys
new_private_pem = new_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.OpenSSH,
encryption_algorithm=serialization.NoEncryption()
)
new_public_ssh = new_public_key.public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH
)
# Create new key metadata
new_key_id = f"rotated-{old_key_id}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
new_metadata = {
'key_id': new_key_id,
'old_key_id': old_key_id,
'public_key': new_public_ssh.decode(),
'created_date': datetime.now(),
'rotation_reason': key_metadata.get('rotation_reason', 'scheduled'),
'owner': key_metadata['owner']
}
# Deploy new key in parallel with old key
self.deploy_new_key(new_metadata, key_metadata['deployed_hosts'])
# Test new key
if self.test_new_key(new_metadata, key_metadata['deployed_hosts']):
# Schedule old key removal
self.schedule_old_key_removal(old_key_id, delay_hours=24)
# Update key repository
self.update_key_repository(new_metadata)
# Notify stakeholders
self.notify_rotation_complete(old_key_id, new_key_id)
self.logger.info(f"Successfully rotated key {old_key_id} to {new_key_id}")
return True
else:
self.logger.error(f"New key testing failed for {new_key_id}")
self.rollback_rotation(new_key_id, old_key_id)
return False
except Exception as e:
self.logger.error(f"Key rotation failed: {str(e)}")
return False
def deploy_new_key(self, new_key_metadata, target_hosts):
"""Deploy new key to target hosts"""
deployment_tasks = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
for host in target_hosts:
task = executor.submit(self._deploy_to_host, new_key_metadata, host)
deployment_tasks.append(task)
# Wait for all deployments
results = concurrent.futures.wait(deployment_tasks)
# Check results
failed_hosts = []
for future in results.done:
if not future.result():
failed_hosts.append(future.host)
if failed_hosts:
self.logger.warning(f"Failed to deploy to hosts: {failed_hosts}")
def _deploy_to_host(self, key_metadata, host):
"""Deploy key to single host"""
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Connect using deployment credentials
ssh.connect(
host['hostname'],
username=self.config['deployment_user'],
key_filename=self.config['deployment_key']
)
# Add new key to authorized_keys
command = f"echo '{key_metadata['public_key']}' >> ~/.ssh/authorized_keys"
stdin, stdout, stderr = ssh.exec_command(command)
if stderr.read():
return False
ssh.close()
return True
except Exception as e:
self.logger.error(f"Failed to deploy to {host['hostname']}: {str(e)}")
return False
def test_new_key(self, key_metadata, target_hosts):
"""Test new key on all target hosts"""
test_results = []
for host in target_hosts:
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Test connection with new key
ssh.connect(
host['hostname'],
username=host['username'],
key_filename=key_metadata['private_key_path']
)
# Execute test command
stdin, stdout, stderr = ssh.exec_command('echo "SSH key test successful"')
result = stdout.read().decode().strip()
ssh.close()
test_results.append(result == "SSH key test successful")
except Exception as e:
self.logger.error(f"Key test failed on {host['hostname']}: {str(e)}")
test_results.append(False)
return all(test_results)
def schedule_old_key_removal(self, old_key_id, delay_hours):
"""Schedule removal of old key after delay"""
removal_time = datetime.now() + timedelta(hours=delay_hours)
# Create scheduled task
task = {
'task_type': 'remove_key',
'key_id': old_key_id,
'scheduled_time': removal_time,
'status': 'pending'
}
# Store in task queue
self.store_scheduled_task(task)
self.logger.info(f"Scheduled removal of key {old_key_id} at {removal_time}")
def automated_rotation_cycle(self):
"""Run automated rotation cycle"""
self.logger.info("Starting automated rotation cycle")
# Get all active keys
active_keys = self.get_active_keys()
for key in active_keys:
needs_rotation, reason = self.evaluate_rotation_need(key)
if needs_rotation:
self.logger.info(f"Key {key['key_id']} needs rotation: {reason}")
key['rotation_reason'] = reason
# Perform rotation
if self.rotate_key(key['key_id'], key):
self.logger.info(f"Successfully rotated key {key['key_id']}")
else:
self.logger.error(f"Failed to rotate key {key['key_id']}")
# Send alert
self.send_rotation_failure_alert(key['key_id'], reason)
Effective SSH key management requires comprehensive lifecycle controls, centralized repositories, automated distribution, and intelligent rotation strategies. By implementing these systems, organizations maintain strong security while enabling seamless access for authorized users. Regular auditing and continuous improvement ensure key management practices evolve with changing security requirements and organizational needs.## SSH Security Audit Checklist
Regular SSH security audits ensure configurations remain secure and compliant with organizational policies. A comprehensive audit examines every aspect of SSH deployment, from basic configurations to advanced security features. This detailed checklist provides a systematic approach to auditing SSH implementations, helping identify vulnerabilities before they can be exploited. By following this guide, security teams can maintain robust SSH security postures across their infrastructure.