Monitoring and Compliance

Monitoring and Compliance

Continuous monitoring ensures hardening measures remain effective and comply with security policies. Implement comprehensive logging and monitoring systems that provide visibility into SSH activity.

Deploy advanced SSH monitoring:

#!/usr/bin/env python3
# ssh-monitor.py
# Advanced SSH monitoring and compliance checking

import re
import os
import json
import time
import hashlib
from datetime import datetime, timedelta
import psutil
import subprocess

class SSHMonitor:
    def __init__(self):
        self.config_baseline = self.create_baseline()
        self.active_sessions = {}
        self.compliance_failures = []
        
    def create_baseline(self):
        """Create configuration baseline for comparison"""
        baseline = {
            'config_hash': self.hash_file('/etc/ssh/sshd_config'),
            'required_settings': {
                'PermitRootLogin': 'no',
                'PasswordAuthentication': 'no',
                'PubkeyAuthentication': 'yes',
                'PermitEmptyPasswords': 'no',
                'Protocol': '2',
                'StrictModes': 'yes',
                'MaxAuthTries': '3',
                'ClientAliveInterval': '300',
                'ClientAliveCountMax': '2'
            },
            'forbidden_users': ['root', 'admin', 'guest'],
            'required_algorithms': {
                'ciphers': ['[email protected]', '[email protected]'],
                'macs': ['[email protected]', '[email protected]'],
                'kex': ['curve25519-sha256', '[email protected]']
            }
        }
        return baseline
        
    def hash_file(self, filepath):
        """Calculate SHA256 hash of file"""
        sha256_hash = hashlib.sha256()
        with open(filepath, "rb") as f:
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
        return sha256_hash.hexdigest()
        
    def check_compliance(self):
        """Verify SSH configuration compliance"""
        # Check configuration file integrity
        current_hash = self.hash_file('/etc/ssh/sshd_config')
        if current_hash != self.config_baseline['config_hash']:
            self.compliance_failures.append({
                'type': 'config_modified',
                'details': 'SSH configuration file has been modified',
                'severity': 'high'
            })
            
        # Verify required settings
        current_config = subprocess.check_output(['sshd', '-T'], text=True)
        for setting, expected in self.config_baseline['required_settings'].items():
            pattern = f'^{setting.lower()}\\s+{expected}$'
            if not re.search(pattern, current_config, re.MULTILINE | re.IGNORECASE):
                self.compliance_failures.append({
                    'type': 'setting_mismatch',
                    'setting': setting,
                    'expected': expected,
                    'severity': 'high'
                })
                
        # Check for forbidden users with SSH access
        passwd_file = open('/etc/passwd', 'r')
        for line in passwd_file:
            user = line.split(':')[0]
            if user in self.config_baseline['forbidden_users']:
                # Check if user has valid shell
                shell = line.split(':')[-1].strip()
                if shell not in ['/bin/false', '/usr/sbin/nologin']:
                    self.compliance_failures.append({
                        'type': 'forbidden_user_active',
                        'user': user,
                        'severity': 'critical'
                    })
                    
    def monitor_active_sessions(self):
        """Monitor active SSH sessions"""
        # Get current SSH processes
        for proc in psutil.process_iter(['pid', 'name', 'username', 'connections']):
            if proc.info['name'] == 'sshd':
                connections = proc.info.get('connections', [])
                for conn in connections:
                    if conn.status == 'ESTABLISHED':
                        session_key = f"{conn.raddr[0]}:{conn.raddr[1]}"
                        if session_key not in self.active_sessions:
                            self.active_sessions[session_key] = {
                                'start_time': datetime.now(),
                                'pid': proc.info['pid'],
                                'user': proc.info['username'],
                                'remote_ip': conn.raddr[0],
                                'remote_port': conn.raddr[1]
                            }
                            self.alert_new_session(self.active_sessions[session_key])
                            
    def generate_compliance_report(self):
        """Generate compliance report"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'compliant': len(self.compliance_failures) == 0,
            'failures': self.compliance_failures,
            'active_sessions': list(self.active_sessions.values()),
            'recommendations': self.generate_recommendations()
        }
        
        with open('/var/log/ssh-compliance.json', 'w') as f:
            json.dump(report, f, indent=2, default=str)
            
        return report
        
    def generate_recommendations(self):
        """Generate security recommendations based on findings"""
        recommendations = []
        
        if any(f['type'] == 'config_modified' for f in self.compliance_failures):
            recommendations.append({
                'priority': 'high',
                'action': 'Review and approve SSH configuration changes',
                'command': 'diff /etc/ssh/sshd_config /etc/ssh/sshd_config.baseline'
            })
            
        if any(f['type'] == 'forbidden_user_active' for f in self.compliance_failures):
            recommendations.append({
                'priority': 'critical',
                'action': 'Disable shell access for system accounts',
                'command': 'usermod -s /usr/sbin/nologin <username>'
            })
            
        return recommendations