Compliance and Audit Trail

Compliance and Audit Trail

Key rotation must maintain comprehensive audit trails to demonstrate compliance with security policies and regulatory requirements. Every rotation event, from planning through execution, requires detailed documentation that proves adherence to established procedures.

Implement rotation audit system:

#!/usr/bin/env python3
# rotation-audit-system.py
# Comprehensive audit trail for SSH key rotations

import json
import hashlib
import sqlite3
from datetime import datetime, timedelta
import cryptography.hazmat.primitives.serialization as serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
import pandas as pd
from jinja2 import Template

class RotationAuditSystem:
    def __init__(self, audit_db_path):
        self.conn = sqlite3.connect(audit_db_path)
        self.conn.row_factory = sqlite3.Row
        self._init_audit_tables()
        
    def _init_audit_tables(self):
        """Initialize audit trail database"""
        cursor = self.conn.cursor()
        
        # Main audit trail
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS rotation_audit_trail (
            audit_id INTEGER PRIMARY KEY AUTOINCREMENT,
            rotation_id TEXT UNIQUE,
            key_identifier TEXT,
            rotation_type TEXT,
            initiated_by TEXT,
            initiated_at TIMESTAMP,
            completed_at TIMESTAMP,
            status TEXT,
            old_key_fingerprint TEXT,
            new_key_fingerprint TEXT,
            systems_affected INTEGER,
            compliance_frameworks TEXT,
            approval_chain TEXT,
            risk_assessment TEXT,
            rollback_performed BOOLEAN DEFAULT 0
        )''')
        
        # Detailed rotation steps
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS rotation_steps (
            step_id INTEGER PRIMARY KEY AUTOINCREMENT,
            rotation_id TEXT,
            step_name TEXT,
            step_timestamp TIMESTAMP,
            status TEXT,
            details TEXT,
            performed_by TEXT,
            duration_seconds INTEGER,
            error_message TEXT,
            FOREIGN KEY(rotation_id) REFERENCES rotation_audit_trail(rotation_id)
        )''')
        
        # System update tracking
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS system_updates (
            update_id INTEGER PRIMARY KEY AUTOINCREMENT,
            rotation_id TEXT,
            system_name TEXT,
            update_timestamp TIMESTAMP,
            update_type TEXT,
            old_key_present BOOLEAN,
            new_key_present BOOLEAN,
            update_successful BOOLEAN,
            error_details TEXT,
            FOREIGN KEY(rotation_id) REFERENCES rotation_audit_trail(rotation_id)
        )''')
        
        # Compliance validations
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS compliance_validations (
            validation_id INTEGER PRIMARY KEY AUTOINCREMENT,
            rotation_id TEXT,
            framework TEXT,
            requirement TEXT,
            validation_timestamp TIMESTAMP,
            validation_result TEXT,
            evidence TEXT,
            validated_by TEXT,
            FOREIGN KEY(rotation_id) REFERENCES rotation_audit_trail(rotation_id)
        )''')
        
        self.conn.commit()
        
    def start_rotation_audit(self, rotation_id, key_identifier, rotation_type, 
                           initiated_by, compliance_frameworks):
        """Begin audit trail for new rotation"""
        cursor = self.conn.cursor()
        
        # Create main audit record
        cursor.execute('''
        INSERT INTO rotation_audit_trail 
        (rotation_id, key_identifier, rotation_type, initiated_by, 
         initiated_at, status, compliance_frameworks)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            rotation_id, key_identifier, rotation_type, initiated_by,
            datetime.now(), 'in_progress', json.dumps(compliance_frameworks)
        ))
        
        # Log initiation step
        self.log_rotation_step(
            rotation_id, 
            'initiation',
            'started',
            {'reason': rotation_type, 'approvals_required': len(compliance_frameworks)},
            initiated_by
        )
        
        self.conn.commit()
        
    def log_rotation_step(self, rotation_id, step_name, status, details, performed_by):
        """Log individual rotation step"""
        cursor = self.conn.cursor()
        
        step_start = datetime.now()
        
        cursor.execute('''
        INSERT INTO rotation_steps 
        (rotation_id, step_name, step_timestamp, status, details, performed_by)
        VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            rotation_id, step_name, step_start, status, 
            json.dumps(details), performed_by
        ))
        
        self.conn.commit()
        
        return cursor.lastrowid
        
    def update_step_completion(self, step_id, status, error_message=None):
        """Update step completion status"""
        cursor = self.conn.cursor()
        
        # Get step start time
        cursor.execute('SELECT step_timestamp FROM rotation_steps WHERE step_id = ?', (step_id,))
        result = cursor.fetchone()
        
        if result:
            duration = (datetime.now() - datetime.fromisoformat(result['step_timestamp'])).seconds
            
            cursor.execute('''
            UPDATE rotation_steps 
            SET status = ?, duration_seconds = ?, error_message = ?
            WHERE step_id = ?
            ''', (status, duration, error_message, step_id))
            
            self.conn.commit()
            
    def log_system_update(self, rotation_id, system_name, update_type, 
                         old_key_present, new_key_present, successful, error=None):
        """Log system-level update"""
        cursor = self.conn.cursor()
        
        cursor.execute('''
        INSERT INTO system_updates 
        (rotation_id, system_name, update_timestamp, update_type,
         old_key_present, new_key_present, update_successful, error_details)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            rotation_id, system_name, datetime.now(), update_type,
            old_key_present, new_key_present, successful, error
        ))
        
        self.conn.commit()
        
    def validate_compliance(self, rotation_id, framework, requirement, result, evidence, validator):
        """Record compliance validation"""
        cursor = self.conn.cursor()
        
        cursor.execute('''
        INSERT INTO compliance_validations
        (rotation_id, framework, requirement, validation_timestamp,
         validation_result, evidence, validated_by)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            rotation_id, framework, requirement, datetime.now(),
            result, json.dumps(evidence), validator
        ))
        
        self.conn.commit()
        
    def complete_rotation_audit(self, rotation_id, status, old_fingerprint, 
                               new_fingerprint, systems_affected):
        """Complete rotation audit trail"""
        cursor = self.conn.cursor()
        
        cursor.execute('''
        UPDATE rotation_audit_trail
        SET completed_at = ?, status = ?, old_key_fingerprint = ?,
            new_key_fingerprint = ?, systems_affected = ?
        WHERE rotation_id = ?
        ''', (
            datetime.now(), status, old_fingerprint, 
            new_fingerprint, systems_affected, rotation_id
        ))
        
        self.conn.commit()
        
    def generate_compliance_report(self, start_date, end_date, framework=None):
        """Generate compliance report for rotations"""
        cursor = self.conn.cursor()
        
        # Get rotations in period
        query = '''
        SELECT * FROM rotation_audit_trail
        WHERE initiated_at BETWEEN ? AND ?
        '''
        
        params = [start_date, end_date]
        
        if framework:
            query += ' AND compliance_frameworks LIKE ?'
            params.append(f'%{framework}%')
            
        cursor.execute(query, params)
        
        rotations = cursor.fetchall()
        
        report = {
            'period': {'start': start_date, 'end': end_date},
            'framework': framework or 'All',
            'summary': {
                'total_rotations': len(rotations),
                'successful': sum(1 for r in rotations if r['status'] == 'completed'),
                'failed': sum(1 for r in rotations if r['status'] == 'failed'),
                'rollbacks': sum(1 for r in rotations if r['rollback_performed'])
            },
            'rotations': []
        }
        
        # Detailed rotation info
        for rotation in rotations:
            rotation_detail = dict(rotation)
            
            # Get compliance validations
            cursor.execute('''
            SELECT * FROM compliance_validations 
            WHERE rotation_id = ?
            ''', (rotation['rotation_id'],))
            
            rotation_detail['validations'] = [dict(v) for v in cursor.fetchall()]
            
            # Get step timeline
            cursor.execute('''
            SELECT step_name, status, duration_seconds 
            FROM rotation_steps 
            WHERE rotation_id = ?
            ORDER BY step_timestamp
            ''', (rotation['rotation_id'],))
            
            rotation_detail['steps'] = [dict(s) for s in cursor.fetchall()]
            
            report['rotations'].append(rotation_detail)
            
        return report
        
    def generate_audit_certificate(self, rotation_id):
        """Generate cryptographic proof of rotation"""
        cursor = self.conn.cursor()
        
        # Get all rotation data
        cursor.execute('''
        SELECT * FROM rotation_audit_trail WHERE rotation_id = ?
        ''', (rotation_id,))
        
        rotation_data = dict(cursor.fetchone())
        
        # Get all steps
        cursor.execute('''
        SELECT * FROM rotation_steps WHERE rotation_id = ?
        ORDER BY step_timestamp
        ''', (rotation_id,))
        
        steps = [dict(row) for row in cursor.fetchall()]
        
        # Create certificate content
        certificate_data = {
            'rotation_id': rotation_id,
            'rotation_data': rotation_data,
            'steps': steps,
            'generated_at': datetime.now().isoformat()
        }
        
        # Calculate hash of certificate
        cert_json = json.dumps(certificate_data, sort_keys=True)
        cert_hash = hashlib.sha256(cert_json.encode()).hexdigest()
        
        certificate = {
            'certificate_id': f"CERT-{rotation_id}",
            'data': certificate_data,
            'hash': cert_hash,
            'algorithm': 'SHA256'
        }
        
        return certificate
        
    def export_audit_data(self, output_format='json', filters=None):
        """Export audit data for external analysis"""
        cursor = self.conn.cursor()
        
        # Build query with filters
        query = 'SELECT * FROM rotation_audit_trail WHERE 1=1'
        params = []
        
        if filters:
            if 'start_date' in filters:
                query += ' AND initiated_at >= ?'
                params.append(filters['start_date'])
            if 'end_date' in filters:
                query += ' AND initiated_at <= ?'
                params.append(filters['end_date'])
            if 'status' in filters:
                query += ' AND status = ?'
                params.append(filters['status'])
                
        # Execute query
        df = pd.read_sql_query(query, self.conn, params=params)
        
        if output_format == 'json':
            return df.to_json(orient='records', date_format='iso')
        elif output_format == 'csv':
            return df.to_csv(index=False)
        elif output_format == 'html':
            return self.generate_html_report(df)
            
    def generate_html_report(self, df):
        """Generate HTML audit report"""
        template = Template('''
        <!DOCTYPE html>
        <html>
        <head>
            <title>SSH Key Rotation Audit Report</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 40px; }
                table { border-collapse: collapse; width: 100%; }
                th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
                th { background-color: #4CAF50; color: white; }
                tr:nth-child(even) { background-color: #f2f2f2; }
                .success { color: green; }
                .failed { color: red; }
                .summary { background-color: #e7f3fe; padding: 20px; margin-bottom: 20px; }
            </style>
        </head>
        <body>
            <h1>SSH Key Rotation Audit Report</h1>
            <div class="summary">
                <h2>Summary</h2>
                <p>Total Rotations: {{ total_rotations }}</p>
                <p>Successful: <span class="success">{{ successful_rotations }}</span></p>
                <p>Failed: <span class="failed">{{ failed_rotations }}</span></p>
                <p>Report Generated: {{ generation_time }}</p>
            </div>
            
            <h2>Rotation Details</h2>
            <table>
                <thead>
                    <tr>
                        <th>Rotation ID</th>
                        <th>Key Identifier</th>
                        <th>Initiated By</th>
                        <th>Started</th>
                        <th>Completed</th>
                        <th>Status</th>
                        <th>Systems</th>
                    </tr>
                </thead>
                <tbody>
                {% for _, row in rotations.iterrows() %}
                    <tr>
                        <td>{{ row.rotation_id }}</td>
                        <td>{{ row.key_identifier }}</td>
                        <td>{{ row.initiated_by }}</td>
                        <td>{{ row.initiated_at }}</td>
                        <td>{{ row.completed_at or 'In Progress' }}</td>
                        <td class="{{ 'success' if row.status == 'completed' else 'failed' }}">
                            {{ row.status }}
                        </td>
                        <td>{{ row.systems_affected }}</td>
                    </tr>
                {% endfor %}
                </tbody>
            </table>
        </body>
        </html>
        ''')
        
        return template.render(
            total_rotations=len(df),
            successful_rotations=len(df[df['status'] == 'completed']),
            failed_rotations=len(df[df['status'] == 'failed']),
            generation_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            rotations=df
        )

# CLI interface
if __name__ == '__main__':
    import argparse
    
    parser = argparse.ArgumentParser(description='SSH Key Rotation Audit System')
    parser.add_argument('action', choices=['start', 'log', 'complete', 'report', 'export'])
    parser.add_argument('--rotation-id', help='Rotation ID')
    parser.add_argument('--key-id', help='Key identifier')
    parser.add_argument('--type', help='Rotation type')
    parser.add_argument('--user', help='User performing action')
    parser.add_argument('--framework', help='Compliance framework')
    parser.add_argument('--format', choices=['json', 'csv', 'html'], default='json')
    parser.add_argument('--start-date', help='Start date for reports')
    parser.add_argument('--end-date', help='End date for reports')
    
    args = parser.parse_args()
    
    audit_system = RotationAuditSystem('/var/lib/ssh-rotation/audit.db')
    
    if args.action == 'start':
        audit_system.start_rotation_audit(
            args.rotation_id,
            args.key_id,
            args.type,
            args.user,
            [args.framework] if args.framework else []
        )
        print(f"Started audit trail for rotation: {args.rotation_id}")
        
    elif args.action == 'report':
        report = audit_system.generate_compliance_report(
            args.start_date,
            args.end_date,
            args.framework
        )
        print(json.dumps(report, indent=2))
        
    elif args.action == 'export':
        data = audit_system.export_audit_data(
            output_format=args.format,
            filters={
                'start_date': args.start_date,
                'end_date': args.end_date
            }
        )
        print(data)

Effective SSH key rotation strategies combine careful planning, automated implementation, and comprehensive compliance tracking. By implementing these systems, organizations maintain strong security postures while meeting regulatory requirements and minimizing operational disruption. Regular rotation becomes a routine process rather than a disruptive event, ensuring keys remain secure throughout their lifecycle.## How to Secure SSH Server

Securing an SSH server requires a multi-layered approach that goes beyond default configurations. A properly hardened SSH server resists brute-force attacks, prevents unauthorized access, and maintains detailed audit trails while providing reliable service to legitimate users. This comprehensive guide walks through essential security configurations, from basic hardening to advanced protective measures that transform your SSH server into a fortress against modern threats.