Monitoring and Compliance
Monitoring and Compliance
Continuous monitoring ensures hardening measures remain effective and comply with security policies. Implement comprehensive logging and monitoring systems that provide visibility into SSH activity.
Deploy advanced SSH monitoring:
#!/usr/bin/env python3
# ssh-monitor.py
# Advanced SSH monitoring and compliance checking
import re
import os
import json
import time
import hashlib
from datetime import datetime, timedelta
import psutil
import subprocess
class SSHMonitor:
def __init__(self):
self.config_baseline = self.create_baseline()
self.active_sessions = {}
self.compliance_failures = []
def create_baseline(self):
"""Create configuration baseline for comparison"""
baseline = {
'config_hash': self.hash_file('/etc/ssh/sshd_config'),
'required_settings': {
'PermitRootLogin': 'no',
'PasswordAuthentication': 'no',
'PubkeyAuthentication': 'yes',
'PermitEmptyPasswords': 'no',
'Protocol': '2',
'StrictModes': 'yes',
'MaxAuthTries': '3',
'ClientAliveInterval': '300',
'ClientAliveCountMax': '2'
},
'forbidden_users': ['root', 'admin', 'guest'],
'required_algorithms': {
'ciphers': ['[email protected]', '[email protected]'],
'macs': ['[email protected]', '[email protected]'],
'kex': ['curve25519-sha256', '[email protected]']
}
}
return baseline
def hash_file(self, filepath):
"""Calculate SHA256 hash of file"""
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def check_compliance(self):
"""Verify SSH configuration compliance"""
# Check configuration file integrity
current_hash = self.hash_file('/etc/ssh/sshd_config')
if current_hash != self.config_baseline['config_hash']:
self.compliance_failures.append({
'type': 'config_modified',
'details': 'SSH configuration file has been modified',
'severity': 'high'
})
# Verify required settings
current_config = subprocess.check_output(['sshd', '-T'], text=True)
for setting, expected in self.config_baseline['required_settings'].items():
pattern = f'^{setting.lower()}\\s+{expected}$'
if not re.search(pattern, current_config, re.MULTILINE | re.IGNORECASE):
self.compliance_failures.append({
'type': 'setting_mismatch',
'setting': setting,
'expected': expected,
'severity': 'high'
})
# Check for forbidden users with SSH access
passwd_file = open('/etc/passwd', 'r')
for line in passwd_file:
user = line.split(':')[0]
if user in self.config_baseline['forbidden_users']:
# Check if user has valid shell
shell = line.split(':')[-1].strip()
if shell not in ['/bin/false', '/usr/sbin/nologin']:
self.compliance_failures.append({
'type': 'forbidden_user_active',
'user': user,
'severity': 'critical'
})
def monitor_active_sessions(self):
"""Monitor active SSH sessions"""
# Get current SSH processes
for proc in psutil.process_iter(['pid', 'name', 'username', 'connections']):
if proc.info['name'] == 'sshd':
connections = proc.info.get('connections', [])
for conn in connections:
if conn.status == 'ESTABLISHED':
session_key = f"{conn.raddr[0]}:{conn.raddr[1]}"
if session_key not in self.active_sessions:
self.active_sessions[session_key] = {
'start_time': datetime.now(),
'pid': proc.info['pid'],
'user': proc.info['username'],
'remote_ip': conn.raddr[0],
'remote_port': conn.raddr[1]
}
self.alert_new_session(self.active_sessions[session_key])
def generate_compliance_report(self):
"""Generate compliance report"""
report = {
'timestamp': datetime.now().isoformat(),
'compliant': len(self.compliance_failures) == 0,
'failures': self.compliance_failures,
'active_sessions': list(self.active_sessions.values()),
'recommendations': self.generate_recommendations()
}
with open('/var/log/ssh-compliance.json', 'w') as f:
json.dump(report, f, indent=2, default=str)
return report
def generate_recommendations(self):
"""Generate security recommendations based on findings"""
recommendations = []
if any(f['type'] == 'config_modified' for f in self.compliance_failures):
recommendations.append({
'priority': 'high',
'action': 'Review and approve SSH configuration changes',
'command': 'diff /etc/ssh/sshd_config /etc/ssh/sshd_config.baseline'
})
if any(f['type'] == 'forbidden_user_active' for f in self.compliance_failures):
recommendations.append({
'priority': 'critical',
'action': 'Disable shell access for system accounts',
'command': 'usermod -s /usr/sbin/nologin <username>'
})
return recommendations