Key Rotation Strategies

Key Rotation Strategies

Regular key rotation limits the impact of compromised keys and ensures compliance with security policies. Automated rotation systems handle the complexity of updating keys across distributed infrastructure without disrupting service.

Implement intelligent key rotation:

#!/usr/bin/env python3
# ssh-key-rotation.py
# Intelligent SSH key rotation system

import os
import json
import logging
from datetime import datetime, timedelta
import paramiko
import concurrent.futures
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519

class SSHKeyRotationManager:
    def __init__(self, config_file):
        with open(config_file, 'r') as f:
            self.config = json.load(f)
            
        self.logger = self._setup_logging()
        self.rotation_policies = self.config['rotation_policies']
        self.notification_handlers = []
        
    def _setup_logging(self):
        logger = logging.getLogger('ssh_key_rotation')
        handler = logging.FileHandler('/var/log/ssh-key-rotation.log')
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)
        return logger
        
    def evaluate_rotation_need(self, key_metadata):
        """Determine if key needs rotation based on policies"""
        key_age = (datetime.now() - key_metadata['created_date']).days
        
        # Check age-based rotation
        for policy in self.rotation_policies:
            if key_metadata['key_type'] in policy['key_types']:
                if key_age >= policy['max_age_days']:
                    return True, f"Key age ({key_age} days) exceeds policy maximum ({policy['max_age_days']} days)"
                    
        # Check usage-based rotation
        if key_metadata.get('usage_count', 0) > self.config['max_usage_count']:
            return True, f"Usage count exceeds maximum ({key_metadata['usage_count']})"
            
        # Check compromise indicators
        if self.check_compromise_indicators(key_metadata):
            return True, "Potential compromise detected"
            
        return False, None
        
    def check_compromise_indicators(self, key_metadata):
        """Check for signs of key compromise"""
        indicators = []
        
        # Unusual access patterns
        if key_metadata.get('access_from_unusual_location'):
            indicators.append('unusual_location')
            
        # Concurrent usage from multiple locations
        if key_metadata.get('concurrent_usage_detected'):
            indicators.append('concurrent_usage')
            
        # Failed authentication spike
        if key_metadata.get('failed_auth_spike'):
            indicators.append('auth_failures')
            
        return len(indicators) > 0
        
    def rotate_key(self, old_key_id, key_metadata):
        """Perform key rotation"""
        self.logger.info(f"Starting rotation for key {old_key_id}")
        
        try:
            # Generate new key pair
            new_private_key = ed25519.Ed25519PrivateKey.generate()
            new_public_key = new_private_key.public_key()
            
            # Serialize keys
            new_private_pem = new_private_key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.OpenSSH,
                encryption_algorithm=serialization.NoEncryption()
            )
            
            new_public_ssh = new_public_key.public_bytes(
                encoding=serialization.Encoding.OpenSSH,
                format=serialization.PublicFormat.OpenSSH
            )
            
            # Create new key metadata
            new_key_id = f"rotated-{old_key_id}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
            new_metadata = {
                'key_id': new_key_id,
                'old_key_id': old_key_id,
                'public_key': new_public_ssh.decode(),
                'created_date': datetime.now(),
                'rotation_reason': key_metadata.get('rotation_reason', 'scheduled'),
                'owner': key_metadata['owner']
            }
            
            # Deploy new key in parallel with old key
            self.deploy_new_key(new_metadata, key_metadata['deployed_hosts'])
            
            # Test new key
            if self.test_new_key(new_metadata, key_metadata['deployed_hosts']):
                # Schedule old key removal
                self.schedule_old_key_removal(old_key_id, delay_hours=24)
                
                # Update key repository
                self.update_key_repository(new_metadata)
                
                # Notify stakeholders
                self.notify_rotation_complete(old_key_id, new_key_id)
                
                self.logger.info(f"Successfully rotated key {old_key_id} to {new_key_id}")
                return True
            else:
                self.logger.error(f"New key testing failed for {new_key_id}")
                self.rollback_rotation(new_key_id, old_key_id)
                return False
                
        except Exception as e:
            self.logger.error(f"Key rotation failed: {str(e)}")
            return False
            
    def deploy_new_key(self, new_key_metadata, target_hosts):
        """Deploy new key to target hosts"""
        deployment_tasks = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            for host in target_hosts:
                task = executor.submit(self._deploy_to_host, new_key_metadata, host)
                deployment_tasks.append(task)
                
            # Wait for all deployments
            results = concurrent.futures.wait(deployment_tasks)
            
            # Check results
            failed_hosts = []
            for future in results.done:
                if not future.result():
                    failed_hosts.append(future.host)
                    
            if failed_hosts:
                self.logger.warning(f"Failed to deploy to hosts: {failed_hosts}")
                
    def _deploy_to_host(self, key_metadata, host):
        """Deploy key to single host"""
        try:
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            # Connect using deployment credentials
            ssh.connect(
                host['hostname'],
                username=self.config['deployment_user'],
                key_filename=self.config['deployment_key']
            )
            
            # Add new key to authorized_keys
            command = f"echo '{key_metadata['public_key']}' >> ~/.ssh/authorized_keys"
            stdin, stdout, stderr = ssh.exec_command(command)
            
            if stderr.read():
                return False
                
            ssh.close()
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to deploy to {host['hostname']}: {str(e)}")
            return False
            
    def test_new_key(self, key_metadata, target_hosts):
        """Test new key on all target hosts"""
        test_results = []
        
        for host in target_hosts:
            try:
                ssh = paramiko.SSHClient()
                ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                
                # Test connection with new key
                ssh.connect(
                    host['hostname'],
                    username=host['username'],
                    key_filename=key_metadata['private_key_path']
                )
                
                # Execute test command
                stdin, stdout, stderr = ssh.exec_command('echo "SSH key test successful"')
                result = stdout.read().decode().strip()
                
                ssh.close()
                test_results.append(result == "SSH key test successful")
                
            except Exception as e:
                self.logger.error(f"Key test failed on {host['hostname']}: {str(e)}")
                test_results.append(False)
                
        return all(test_results)
        
    def schedule_old_key_removal(self, old_key_id, delay_hours):
        """Schedule removal of old key after delay"""
        removal_time = datetime.now() + timedelta(hours=delay_hours)
        
        # Create scheduled task
        task = {
            'task_type': 'remove_key',
            'key_id': old_key_id,
            'scheduled_time': removal_time,
            'status': 'pending'
        }
        
        # Store in task queue
        self.store_scheduled_task(task)
        
        self.logger.info(f"Scheduled removal of key {old_key_id} at {removal_time}")
        
    def automated_rotation_cycle(self):
        """Run automated rotation cycle"""
        self.logger.info("Starting automated rotation cycle")
        
        # Get all active keys
        active_keys = self.get_active_keys()
        
        for key in active_keys:
            needs_rotation, reason = self.evaluate_rotation_need(key)
            
            if needs_rotation:
                self.logger.info(f"Key {key['key_id']} needs rotation: {reason}")
                key['rotation_reason'] = reason
                
                # Perform rotation
                if self.rotate_key(key['key_id'], key):
                    self.logger.info(f"Successfully rotated key {key['key_id']}")
                else:
                    self.logger.error(f"Failed to rotate key {key['key_id']}")
                    
                    # Send alert
                    self.send_rotation_failure_alert(key['key_id'], reason)

Effective SSH key management requires comprehensive lifecycle controls, centralized repositories, automated distribution, and intelligent rotation strategies. By implementing these systems, organizations maintain strong security while enabling seamless access for authorized users. Regular auditing and continuous improvement ensure key management practices evolve with changing security requirements and organizational needs.## SSH Security Audit Checklist

Regular SSH security audits ensure configurations remain secure and compliant with organizational policies. A comprehensive audit examines every aspect of SSH deployment, from basic configurations to advanced security features. This detailed checklist provides a systematic approach to auditing SSH implementations, helping identify vulnerabilities before they can be exploited. By following this guide, security teams can maintain robust SSH security postures across their infrastructure.